Параметры и принципы настройки производительности Dubbo

Dubbo

Параметры и принципы настройки производительности Dubbo

1 модель звонка Даббо

2 Общие параметры настройки производительности

имя параметра Объем По умолчанию иллюстрировать Примечание
threads provider 200 Размер пула потоков бизнес-процессов
iothreads provider Количество процессоров + 1 размер пула потоков ввода-вывода
queues provider 0 Размер очереди пула потоков. Когда пул потоков заполнен, размер очереди для постановки в очередь на выполнение не рекомендуется устанавливать. требования.
connections consumer 0 Для максимального количества соединений на одного провайдера протоколы коротких соединений, такие как rmi, http, hessian, указывают предел количества соединений, а протоколы длинных соединений, такие как Dubbo, указывают количество установленных длинных соединений. Протокол Dubbo использует постоянное соединение по умолчанию.
actives consumer 0 Максимальное количество одновременных вызовов на метод на потребителя службы на службу 0 означает отсутствие ограничений
accepts provider 0 Максимальное количество подключений, которое может принять поставщик услуг 0 означает отсутствие ограничений
executes provider 0 Максимальное количество одновременно выполняемых запросов на метод для каждого поставщика услуг 0 означает отсутствие ограничений

3 Исходный код и принципиальный анализ

3.1 threads

FixedThreadPool.java

public Executor getExecutor(URL url) {
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
            queues == 0 ? new SynchronousQueue<Runnable>() : 
                    (queues < 0 ? new LinkedBlockingQueue<Runnable>() : 
                            new LinkedBlockingQueue<Runnable>(queues)),
            new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

LimitedThreadPool.java

public Executor getExecutor(URL url) {
    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
    int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
    return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 
        	queues == 0 ? new SynchronousQueue<Runnable>() : 
        			(queues < 0 ? new LinkedBlockingQueue<Runnable>() : 
        			        new LinkedBlockingQueue<Runnable>(queues)),
        	new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

где Константы.DEFAULT_QUEUES = 200. Параметр threads настраивает максимальное (или основное) количество потоков в пуле потоков бизнес-процессов.

3.2 iothreads

NettyServer.java

Override
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
        
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

3.3 queues

Они используются соответственно в FixedThreadPool.java, LimitedThreadPool.java и CachedThreadPool.java Подробнее о коде см. Раздел 3.2. Из кода видно, что значение по умолчанию равно 0, что означает, что используется синхронная блокирующая очередь, если для queues установлено значение меньше 0, используется блокирующая очередь связанного списка с емкостью Integer.MAX_VALUE; если это другие значения, используется блокирующая очередь связанного списка указанного размера.

3.4 connections

DubboProtocol.java

private ExchangeClient[] getClients(URL url){
    //是否共享连接
    boolean service_share_connect = false;
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    //如果connections不配置,则共享连接,否则每服务每连接
    if (connections == 0){
        service_share_connect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect){
            clients[i] = getSharedClient(url);
        } else {
            clients[i] = initClient(url);
        }
    }
    return clients;
}

DubboInvoker.java

Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);
    
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
        	boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
        	ResponseFuture future = currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
        	RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

Как видно из вышеизложенного, значение по умолчанию равно 0, что означает, что для каждого провайдера все клиенты совместно используют постоянное соединение, в противном случае устанавливается заданное количество постоянных соединений. При вызове, если есть несколько длинных соединений, используйте метод опроса для получения длинного соединения.

3.5 actives

ActiveLimitFilter.java

public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    if (max > 0) {
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
        long start = System.currentTimeMillis();
        long remain = timeout;
        int active = count.getActive();
        if (active >= max) {
            synchronized (count) {
                while ((active = count.getActive()) >= max) {
                    try {
                        count.wait(remain);
                    } catch (InterruptedException e) {
                    }
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    if (remain <= 0) {
                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                               + invoker.getInterface().getName() + ", method: "
                                               + invocation.getMethodName() + ", elapsed: " + elapsed
                                               + ", timeout: " + timeout + ". concurrent invokes: " + active
                                               + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }
    }
    try {
        long begin = System.currentTimeMillis();
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
            return result;
        } catch (RuntimeException t) {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
            throw t;
        }
    } finally {
        if(max>0){
            synchronized (count) {
                count.notify();
            } 
        }
    }
}

При вызове Потребителя подсчитываются вызовы измерения службы и метода.Если количество одновременных вызовов превышает установленное максимальное значение, текущий поток будет заблокирован до завершения обработки предыдущего запроса.

3.6 accepts

AbstractServer.java

@Override
public void connected(Channel ch) throws RemotingException {
    Collection<Channel> channels = getChannels();
    if (accepts > 0 && channels.size() > accepts) {
        logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
        ch.close();
        return;
    }
    super.connected(ch);
}

Когда количество соединений превышает максимальное значение, текущее соединение закрывается.

3.7 executes

ExecuteLimitFilter.jvava

public Result invokeOrg(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
    if (max > 0) {
        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
        if (count.getActive() >= max) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
        }
    }
    long begin = System.currentTimeMillis();
    boolean isException = false;
    RpcStatus.beginCount(url, methodName);
    try {
        Result result = invoker.invoke(invocation);
        return result;
    } catch (Throwable t) {
        isException = true;
        if(t instanceof RuntimeException) {
            throw (RuntimeException) t;
        }
        else {
            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
        }
    }
    finally {
        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);
    }
}

Когда провайдер обрабатывает запрос, он подсчитывает вызов измерения метода.Если количество параллелизма превышает установленное максимальное значение, он сразу генерирует исключение.