Публичный аккаунт WeChat: маленький черный домовладелец
Поскольку дорога идет далеко, будущее будет лучше
Обучение безгранично, давайте работать вместе!
В предыдущей статье я проанализировал, как понижается версия Dubbo.Помимо понижения, иногда ограничение тока также является очень эффективным решением проблем с высокой производительностью параллелизма.В этой статье я начну анализировать, как Dubbo ограничен в настоящее время. Мы знаем, что текущее ограничение в основном достигается за счет контроля количества подключений, чтобы предотвратить слишком большую обработку запросов в определенном сегменте, что приведет к сбою важных служб.
Вариант использования
контроль соединения с сервером
Ограничить текущего провайдера до 10 потребительских ссылок с использованием протокола dubbo.
<dubbo:provider protocol="dubbo" accepts="10" />
или
<dubbo:protocol name="dubbo" accepts="10" />
управление параллелизмом
ограничениеcom.foo.BarService
Для каждого метода одновременное выполнение сервера (или количество потоков, занимающих пул потоков) не может превышать 10:
<dubbo:service interface="com.foo.BarService" executes="10" />
ограничениеcom.foo.BarService
изsayHello
метод, одновременное выполнение сервера (или количество потоков, занимающих пул потоков) не может превышать 10.
<dubbo:service interface="com.foo.BarService">
<dubbo:method name="sayHello" executes="10" />
</dubbo:service>
активное ограничение тока
Этот метод ограничения тока отличается от первых двух, которые можно установить на стороне поставщика или на стороне потребителя. Может быть установлен на уровне интерфейса или на уровне метода.
Он также имеет разные значения в зависимости от типа соединения, которое потребитель установил с провайдером.
长连接
: указывает максимальное количество запросов, которые может обработать текущее постоянное соединение. Нет проблем с количеством длинных подключений.短连接
: указывает количество коротких подключений, которые текущая служба может обрабатывать одновременно.
уровень класса
<dubbo:service interface="com.foo.BarService" actives="10" />
<dubbo:reference interface="com.foo.BarService" actives="10" />
уровень метода
<dubbo:reference interface="com.foo.BarService">
<dubbo:method name="sayHello" actives="10" />
</dubbo:service>
<dubbo:reference interface="com.foo.BarService">
<dubbo:method name="sayHello" actives="10" />
</dubbo:service>
ограничение тока соединений
Он может быть установлен на стороне поставщика или на стороне потребителя. Ограничьте количество подключений. Для коротких подключений, как и для активных. Но для длинных соединений он указывает количество длинных соединений.
Как правило, соединения используются вместе с активами, так что соединения ограничивают количество длинных соединений, а активы ограничивают количество запросов, которые могут быть обработаны в длинном соединении.
Ограничьте использование клиентских служб не более чем 10 подключениями.
<dubbo:reference interface="com.foo.BarService" connections="10" />
или
<dubbo:service interface="com.foo.BarService" connections="10" />
если<dubbo:service>
а также<dubbo:reference>
Соединения настроены,<dubbo:reference>
приоритет.
Задерживается
Ленивые соединения могут быть установлены только на стороне потребителя и не могут быть установлены на уровне метода. Работает только с протоколом раскрытия службы Dubbo. Отложите установление постоянных соединений до тех пор, пока потребитель не позвонит поставщику. Количество длинных соединений может быть уменьшено.
<!--设置当前消费者对接口中的每个方法发出链接采用延迟加载-->
<dubbo:reference id="userService" lazy="true"
interface="com.dubbo.service.UserService"/>
<!--设置当前消费者对所有接口中的所有方法发出链接采用延迟加载-->
<dubbo:consumer lazy="true"></dubbo:consumer>
Мы уже объяснили, как установить количество управляющих ссылок, так как же они реализованы внизу?
Анализ исходного кода
По сути, приведенная выше логика является фильтром, все фильтры будут связаны в цепочку фильтров, каждый запрос будет проходить через каждый фильтр во всей цепочке. Итак, когда он построен как цепочка фильтров?
Вызывается при раскрытии службыbuildInvokerChain
, действительно будет выполнятьinvoker
Поместите его в конец цепочки фильтров, а затем выполнитеprotocol.expert(buildInvokerChain(invoker, ...))
Метод заключается в выполнении сервисного воздействия.
Вызывается при ссылке на службуprotocol.refer()
метод первыйInvoker
, затем позвонитеbuildInvokerChain(protocol.refer(type, url), ...)
генерировать цепочку вызовов типа потребления.
ExecuteLimitFilter
Он используется для ограничения максимального количества одновременно используемых методов в каждой службе, и существуют методы настройки уровня интерфейса и уровня метода.
public class ExecuteLimitFilter extends ListenableFilter {
private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";
public ExecuteLimitFilter() {
super.listener = new ExecuteLimitListener();
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}
static class ExecuteLimitListener implements Listener {
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
}
private long getElapsed(Invocation invocation) {
String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);
return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0;
}
}
}
public class RpcStatus {
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
private final AtomicInteger active = new AtomicInteger();
private final AtomicLong total = new AtomicLong();
private final AtomicInteger failed = new AtomicInteger();
private final AtomicLong totalElapsed = new AtomicLong();
private final AtomicLong failedElapsed = new AtomicLong();
private final AtomicLong maxElapsed = new AtomicLong();
private final AtomicLong failedMaxElapsed = new AtomicLong();
private final AtomicLong succeededMaxElapsed = new AtomicLong();
//......
public static void beginCount(URL url, String methodName) {
beginCount(url, methodName, Integer.MAX_VALUE);
}
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.incrementAndGet() > max) {
methodStatus.active.decrementAndGet();
return false;
} else {
appStatus.active.incrementAndGet();
return true;
}
}
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
endCount(getStatus(url), elapsed, succeeded);
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
status.active.decrementAndGet();
status.total.incrementAndGet();
status.totalElapsed.addAndGet(elapsed);
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
if (succeeded) {
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
status.failed.incrementAndGet();
status.failedElapsed.addAndGet(elapsed);
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}
//......
}
Основной принцип: в кадровом буфере ConcularentMap счетчики. Количество одновременного URL генерирует идентификационную систему для каждого запроса, а в качестве ключа; идентификацияSString затем генерирует RPCStatus каждая цель, как это значение. Объекты RPCStatus для записи, соответствующей количеству одновременного. Перед вызовом запуска объект будет получать RPCSTATUS по его URL-адресу, атомное количество одновременных +1 счетчика счетчика, а затем, наконец, в атоме минус 1. До тех пор, пока счетчик времени +1, обнаружил, что текущая настройка счетчика больше, чем количество одновременно, он будет выбрасывать исключение.
TpsLimitFilter
public class DefaultTPSLimiter implements TPSLimiter {
/**
* 每个Service维护一个计数器
*/
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
//servicekey并没有和方法绑定,只能限流接口
String serviceKey = url.getServiceKey();
if (rate > 0) {
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable();
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
class StatItem {
//接口名
private String name;
//计数周期开始
private long lastResetTime;
//计数间隔
private long interval;
//剩余计数请求数
private AtomicInteger token;
//总共允许请求数
private int rate;
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = new AtomicInteger(rate);
}
public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
//乐观锁增加计数
flag = token.compareAndSet(value, value - 1);
//失败重新获取
value = token.get();
}
return flag;
}
long getLastResetTime() {
return lastResetTime;
}
int getToken() {
return token.get();
}
@Override
public String toString() {
return new StringBuilder(32).append("StatItem ")
.append("[name=").append(name).append(", ")
.append("rate = ").append(rate).append(", ")
.append("interval = ").append(interval).append("]")
.toString();
}
}
Текущее ограничение TpsLimitFilter основано на токенах, то есть за период времени выделяется только N токенов, и каждый запрос будет потреблять один токен.Когда он будет исчерпан, последующие запросы будут отклонены.
Конкретная логика вDefaultTPSLimiter#isAllowable
, будет использовать этот метод, чтобы определить, активировано ли текущее ограничение.
Внутри DefaultTPSLimiter ConcurrentHashMap используется для кэширования количества токенов каждого интерфейса, ключом является интерфейс+группа+версия, а значением является объект StatItem, который охватывает интервал обновления токена, количество токенов, выдаваемых каждый раз, и т.п. Во-первых, определить, превышает ли текущее время минус время выпуска последнего токена временной интервал, если он превышает, то токен будет перевыпущен, а оставшиеся токены будут напрямую перезаписаны. Затем CAS вычитает 1 токен, и если он меньше 0, сработает текущий лимит.
ActiveLimitFilter
и поставщика услугExecuteLimitFilter
Точно так же это фильтр на стороне потребителя, ограничивающий количество параллелизма на стороне клиента.
Но этоExecuteLimitFilter
Разница в том, что он не генерирует исключение напрямую. Но при достижении порога он сначала заблокирует и выгрузит объект RpcStatus текущего интерфейса, а затем будет ждать через метод ожидания.Время ждать есть, потому что запрос есть.timeout
атрибут. Затем, если Invoker одновременно уменьшает счетчик на -1 и запускает уведомление после завершения вызова, поток в состоянии ожидания будет пробужден и продолжит выполнение, чтобы определить, истекло ли время ожидания, а если время ожидания истекло, возникнет исключение. быть брошенным. Если текущее количество параллелизма все еще превышает пороговое значение, продолжайте выполнение метода ожидания, если оно не превышает пороговое значение, выйдите из цикла, CAS+1 и вызовите метод вызова.После завершения вызова метод CAS-1 и, наконец, разбудить другой поток через уведомление.
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取配置的数量
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 获取当前接口调用统计
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
//获取接口超时时间
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();// 获取并发数
if (active >= max) {// 如果大于最大数量
synchronized (count) {
while ((active = count.getActive()) >= max) {
try {
// 挂起当前线程并释放锁,因为并发数已超过限制
count.wait(remain);
} catch (InterruptedException e) {
}
// 通过notify唤醒了,计算挂起的时间
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {// 如果已经超过超时时间
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
long begin = System.currentTimeMillis();
// 增加该方法的数量
RpcStatus.beginCount(url, methodName);
try {
// 调用
Result result = invoker.invoke(invocation);
// 减少数量
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if(max>0){// 调用完成后调用notify唤醒在等待的线程
synchronized (count) {
count.notify();
}
}
}
}
Справочная статья:
Текущий ограничивающий анализ исходного кода Dubbo TpsLimitFilter
Текущий лимит сервиса Dubbo
Анализ исходного кода Dubbo ---- фильтр ActiveLimitFilter