Параметры и принципы настройки производительности Dubbo
- 1 модель звонка Даббо
- 2 Общие параметры настройки производительности
- 3 Исходный код и принципиальный анализ
1 модель звонка Даббо
2 Общие параметры настройки производительности
имя параметра | Объем | По умолчанию | иллюстрировать | Примечание |
---|---|---|---|---|
threads | provider | 200 | Размер пула потоков бизнес-процессов | |
iothreads | provider | Количество процессоров + 1 | размер пула потоков ввода-вывода | |
queues | provider | 0 | Размер очереди пула потоков. Когда пул потоков заполнен, размер очереди для постановки в очередь на выполнение не рекомендуется устанавливать. требования. | |
connections | consumer | 0 | Для максимального количества соединений на одного провайдера протоколы коротких соединений, такие как rmi, http, hessian, указывают предел количества соединений, а протоколы длинных соединений, такие как Dubbo, указывают количество установленных длинных соединений. | Протокол Dubbo использует постоянное соединение по умолчанию. |
actives | consumer | 0 | Максимальное количество одновременных вызовов на метод на потребителя службы на службу | 0 означает отсутствие ограничений |
accepts | provider | 0 | Максимальное количество подключений, которое может принять поставщик услуг | 0 означает отсутствие ограничений |
executes | provider | 0 | Максимальное количество одновременно выполняемых запросов на метод для каждого поставщика услуг | 0 означает отсутствие ограничений |
3 Исходный код и принципиальный анализ
3.1 threads
FixedThreadPool.java
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>() :
new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
LimitedThreadPool.java
public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>() :
new LinkedBlockingQueue<Runnable>(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
где Константы.DEFAULT_QUEUES = 200. Параметр threads настраивает максимальное (или основное) количество потоков в пуле потоков бизнес-процессов.
3.2 iothreads
NettyServer.java
Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
3.3 queues
Они используются соответственно в FixedThreadPool.java, LimitedThreadPool.java и CachedThreadPool.java Подробнее о коде см. Раздел 3.2. Из кода видно, что значение по умолчанию равно 0, что означает, что используется синхронная блокирующая очередь, если для queues установлено значение меньше 0, используется блокирующая очередь связанного списка с емкостью Integer.MAX_VALUE; если это другие значения, используется блокирующая очередь связанного списка указанного размера.
3.4 connections
DubboProtocol.java
private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect){
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
DubboInvoker.java
Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
Как видно из вышеизложенного, значение по умолчанию равно 0, что означает, что для каждого провайдера все клиенты совместно используют постоянное соединение, в противном случае устанавливается заданное количество постоянных соединений. При вызове, если есть несколько длинных соединений, используйте метод опроса для получения длинного соединения.
3.5 actives
ActiveLimitFilter.java
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
if (active >= max) {
synchronized (count) {
while ((active = count.getActive()) >= max) {
try {
count.wait(remain);
} catch (InterruptedException e) {
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
long begin = System.currentTimeMillis();
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if(max>0){
synchronized (count) {
count.notify();
}
}
}
}
При вызове Потребителя подсчитываются вызовы измерения службы и метода.Если количество одновременных вызовов превышает установленное максимальное значение, текущий поток будет заблокирован до завершения обработки предыдущего запроса.
3.6 accepts
AbstractServer.java
@Override
public void connected(Channel ch) throws RemotingException {
Collection<Channel> channels = getChannels();
if (accepts > 0 && channels.size() > accepts) {
logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.close();
return;
}
super.connected(ch);
}
Когда количество соединений превышает максимальное значение, текущее соединение закрывается.
3.7 executes
ExecuteLimitFilter.jvava
public Result invokeOrg(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (max > 0) {
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
if (count.getActive() >= max) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}
long begin = System.currentTimeMillis();
boolean isException = false;
RpcStatus.beginCount(url, methodName);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
isException = true;
if(t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
finally {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);
}
}
Когда провайдер обрабатывает запрос, он подсчитывает вызов измерения метода.Если количество параллелизма превышает установленное максимальное значение, он сразу генерирует исключение.