Говоря о Dubbo (6): принцип цепочки исходный код ядра-фильтр

задняя часть Spring Алибаба Dubbo

0 Предисловие

Для веб-приложений Java фильтр Spring может перехватывать вызовы веб-интерфейса, но для интерфейсов Dubbo фильтр Spring не работает.

Реализация фильтра в Dubbo:Специально для перехвата процесса вызова сервис-провайдеров и сервис-потребителей большинство собственных функций Dubbo реализовано на основе этой точки расширения, перехват будет выполняться каждый раз при выполнении удаленного метода, но помните о его влиянии на производительность.

Таким образом, в реальном развитии бизнесаНаиболее часто используемым может быть расширение интерфейса фильтра., внедрить нашу собственную логику обработки в ссылку вызова службы, такую ​​как печать журнала, статистика времени вызова и т. д.

Официальный представитель Dubbo сделал большую встроенную поддержку Filter,На данный момент их около 20, включая всем известный RpcContext и функцию accesslog, которые все реализованы через фильтры., давайте подробно рассмотрим реализацию Filter.

1 Построить цепочку фильтров

Запись реализации фильтра Dubbo:В ProtocolFilterWrapper, поскольку ProtocolFilterWrapper является классом-оболочкой для протокола., поэтому оно будет автоматически упаковано при загрузке расширения (поймите, что предпосылка здесь заключается в том, чтоПонимание механизма SPI Dubbo), оболочка реализует интерфейс протокола и предоставляет конструктору тип параметра протокола.Dubbo идентифицирует оболочку на основе этого метода построения и использует оболочку в качестве прокси для других реализаций интерфейса протокола..

Далее давайте посмотрим, как строится цепочка фильтров в ProtocolFilterWrapper:

public class ProtocolFilterWrapper implements Protocol {
    private final Protocol protocol;
    // 带参数构造器,ExtensionLoad通过该构造器识别封装器
    public ProtocolFilterWrapper(Protocol protocol){
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }
    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }
    // 对提供方服务暴露进行封装,组装filter调用链
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 向注册中心发布服务的时候并不会进行filter调用链
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
    // 对消费方服务引用进行封装,组装filter调用链
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 向注册中心引用服务的时候并不会进行filter调用链
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }
    public void destroy() {
        protocol.destroy();
    }
    // 构造filter调用链
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 获得所有激活的Filter(已经排好序的)
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                // 复制引用,构建filter调用链
                final Invoker<T> next = last;
                // 这里只是构造一个最简化的Invoker作为调用链的载体Invoker
                last = new Invoker<T>() {
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
                    // 关键代码,单向链表指针传递
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                    public void destroy() {
                        invoker.destroy();
                    }
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
}

Ключевой код здесь находится в методе buildInvokerChain, а вызывающий параметр — это фактическая служба (Для потребителя это динамический прокси службы.). Получите отсортированный список фильтров из ExtensionLoader (правила сортировки см. в разделе ActivateComparator), а затем начните сборку в обратном порядке.

Это типичный шаблон декоратора, но каждый узел в цепочке декораторов является экземпляром анонимного внутреннего класса Invoker..

  1. Каждый вызывающий узел содержит ссылку на фильтр, ссылку на подчиненный узел вызывающего и экземпляр вызывающего, который фактически вызывается (Хотя он удерживается, но фактически не вызывается, он только обеспечивает функцию получения фактических параметров, связанных с вызывающей стороной, таких как getInterface, getUrl и другие методы.);
  2. Через метод вызоваУзел-вызывающий передает подчиненный узел текущему фильтру для вызова.;
  3. Фильтр При выполнении метода InvokeЭто заставит инициатора подчиненного узла вызвать его метод вызова., чтобы реализовать понижающий проход вызова;
  4. При достижении узла вызывающего последнего уровняТо есть реальный вызывающий сервис может выполнять реальную бизнес-логику.;

Каждый узел этой цепочки вызовов добавляет пользовательские функции к реальному вызывающему объекту и постоянно дополняет функции по всей цепочке, что является типичным шаблоном декоратора.

Увидев приведенный выше контент, мы можем примерно понять, что реализация такова,Получив все цепочки фильтров, которые можно активировать, и затем построив цепочку вызовов фильтров в определенном порядке, окончательная цепочка вызовов будет примерно такой: Filter1->Filter2->Filter3->...->Invoker, логика построения цепочки Filter очень проста,Дело в том, как получить активированную цепочку фильтров.

// 将key在url中对应的配置值切换成字符串信息数组
public List<T> getActivateExtension(URL url, String key, String group) {
    String value = url.getParameter(key);
    return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
    
public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> exts = new ArrayList<T>();
    // 所有用户自己配置的filter信息(有些Filter是默认激活的,有些是配置激活的,这里这里的names就指的配置激活的filter信息)
    List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);

    // 如果这些名称里不包括去除default的标志(-default),换言之就是加载Dubbo提供的默认Filter
    if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
        // 加载extension信息
        getExtensionClasses();
        for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
            // name指的是SPI读取的配置文件的key
            String name = entry.getKey();
            Activate activate = entry.getValue();
            // group主要是区分是在provider端生效还是consumer端生效
            if (isMatchGroup(group, activate.group())) {
                T ext = getExtension(name);
                // 这里以Filter为例:三个判断条件的含义依次是:
                // 1. 用户配置的filter列表中不包含当前ext
                // 2. 用户配置的filter列表中不包含当前ext的加-的key
                // 3. 如果用户的配置信息(url中体现)中有可以激活的配置key并且数据不为0,false,null,N/A,也就是说有正常的使用
                if (! names.contains(name)
                        && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                        && isActive(activate, url)) {
                    exts.add(ext);
                }
            }
        }
        // 根据Activate注解上的order排序
        Collections.sort(exts, ActivateComparator.COMPARATOR);
    }
    // 进行到此步骤的时候Dubbo提供的原生的Filter已经被添加完毕了,下面处理用户自己扩展的Filter
    List<T> usrs = new ArrayList<T>();
    for (int i = 0; i < names.size(); i ++) {
        String name = names.get(i);
        // 如果单个name不是以-开头并且所有的key里面并不包含-'name'(也就是说如果配置成了"dubbo,-dubbo"这种的可以,这个if是进不去的)
        if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
            // 可以通过default关键字替换Dubbo原生的Filter链,主要用来控制调用链顺序
            if (Constants.DEFAULT_KEY.equals(name)) {
                if (usrs.size() > 0) {
                    exts.addAll(0, usrs);
                    usrs.clear();
                }
            } else {
                // 加入用户自己定义的扩展Filter
                T ext = getExtension(name);
                usrs.add(ext);
            }
        }
    }
    if (usrs.size() > 0) {
        exts.addAll(usrs);
    }
    return exts;
}

В принципе, вы можете увидеть, как загружается цепочка фильтров здесь, дизайн здесь очень гибкий, и я не могу не вздохнуть:Просто настроив «-», вы можете вручную удалить собственный фильтр загрузки Dubbo и использовать значение по умолчанию для замены собственного фильтра Dubbo, который будет загружаться для управления порядком.. Хотя эти конструкции представляют собой небольшие функциональные точки, общее ощущение очень гибкое и продуманное.

Таким образом, из приведенного выше анализа исходного кода мы знаем, что:

Цепочка фильтров по умолчанию: сначала выполните собственный фильтр, затем по очереди выполните пользовательский фильтр, а затем вернитесь к источнику..

Зная процесс построения фильтра, мы подробно рассмотрим некоторые из наиболее важных сведений о фильтре. Сначала взгляните на исходный код интерфейса com.alibaba.dubbo.rpc.Filter:

@SPI
public interface Filter {
    Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
}

Собственный фильтр Dubbo определяется в файле META-INF/dubbo/internal/com.alibaba.dubbo.rpc.filter следующим образом:

echo=com.alibaba.dubbo.rpc.filter.EchoFilter
generic=com.alibaba.dubbo.rpc.filter.GenericFilter
genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
token=com.alibaba.dubbo.rpc.filter.TokenFilter
accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
context=com.alibaba.dubbo.rpc.filter.ContextFilter
consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
validation=com.alibaba.dubbo.validation.filter.ValidationFilter
cache=com.alibaba.dubbo.cache.filter.CacheFilter
trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter

Собственный фильтр тайм-аута Dubbo TimeoutFilter реализован следующим образом:

@Activate(group = Constants.PROVIDER)
public class TimeoutFilter implements Filter {
    private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        long start = System.currentTimeMillis();
        Result result = invoker.invoke(invocation);
        long elapsed = System.currentTimeMillis() - start;
        if (invoker.getUrl() != null
                && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(),
                        "timeout", Integer.MAX_VALUE)) {
            if (logger.isWarnEnabled()) {
                logger.warn("invoke time out. method: " + invocation.getMethodName()
                        + "arguments: " + Arrays.toString(invocation.getArguments()) + " , url is "
                        + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
            }
        }
        return result;
    }
}
  1. Нужна ли аннотация @Activate для Dubbo Filter и какие роли в ней играют группа и порядок?

    Для нативного фильтра Dubbo необходима аннотация @Activate, ее группа используется для очереди провайдера/потребителя, а значение ордера является основой для ордера фильтра. Но для пользовательских фильтров аннотация @Activate не используется, а их группировка и порядок полностью задаются пользовательской ручной настройкой.Если пользовательские фильтры снабжены аннотацией @Activate и указана группа, эти настраиваемые фильтры будут обновлены до собственных групп фильтров..

  2. Можно ли настроить порядок фильтров и как?

    регулируемый,Некоторые фильтры можно удалить с помощью символа «-», а значение по умолчанию представляет собой собственную подцепочку фильтров, активированную по умолчанию. Путем изменения порядка фильтров по умолчанию и пользовательских фильтров достигается цель управления порядком..

Давайте создадим несколько случаев, чтобы увидеть, насколько удовлетворяет конфигурация. Предположим, что объектом пользовательского фильтра является filter1, filter2:

случай 1: порядок выполнения: собственная подцепочка фильтров->filter1->filter2

<dubbo:reference filter="filter1,filter2"/>

случай 2: порядок выполнения: filter1->filter2->собственная подцепочка фильтров

<dubbo:reference filter="filter1,filter2,default"/>

случай 3: порядок выполнения: filter1->подцепочка собственного фильтра->filter2, при удалении исходного TokenFilter(token)

<dubbo:service filter="filter1,default,filter2,-token"/>

Если различать фильтр с функциональной стороны, то он в основном делится на потребителя и поставщика., давайте воспримем это как отличие, чтобы представить некоторые ключевые фильтры.

2 Consumer

2.1 ConsumerContextFilter

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;

/**
 * ConsumerContextInvokerFilter(默认触发)
 */
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // 在当前的RpcContext中记录本地调用的一次状态信息
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

}

На самом деле посмотреть на этот Фильтр очень просто,Как он передает неявные параметры, установленные клиентом, на сервер??

Носителем является объект Invocation.Когда клиент вызывает метод Invoker.invoke,Он возьмет свойство вложений в регистраторе текущего состояния RpcContext, а затем установит его в объект RpcInvocation.Когда RpcInvocation передается провайдеру, он сбрасывает объект RpcInvocation обратно в RpcContext через другой фильтр, ContextFilter, для логики сервера. повторно получить неявный параметр.

Вот почему RpcContext может записывать информацию о статусе запроса только один раз,Поскольку параметры были перезаписаны новым RpcInvocation во втором вызове, информация о первом запросе не видна при втором выполнении.

2.2 ActiveLimitFilter

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;

/**
 * LimitInvokerFilter(当配置了actives并且值不为0的时候触发)
 */
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        // 主要记录每台机器针对某个方法的并发数量
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            if (active >= max) {
                synchronized (count) {
                    // 这个while循环是必要的,因为在一次wait结束后,可能线程调用已经结束了,腾出来consumer的空间
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        // 如果wait方法被中断的话,remain这时候有可能大于0
                        // 如果其中一个线程运行结束后自动调用notify方法的话,也有可能remain大于0
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        try {
            // 调用开始和结束后增减并发数量
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                // 这里很关键,因为一个调用完成后要通知正在等待执行的队列
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }

}

ActiveLimitFilter в основном используется дляОграничьте количество одновременных вызовов серверного метода одним и тем же клиентом(Текущий лимит клиента).

2.3 FutureFilter

Future в основном имеет дело с информацией о событиях, в основном включая следующие события:

  1. oninvoke запускается до вызова метода (если вызов ненормальный, метод onthrow будет запущен напрямую)
  2. onreturn будет запущен, когда метод вернется (если в вызове возникнет исключение, метод onthrow будет запущен напрямую)
  3. Запускается, когда возникает исключение в вызове onthrow
package com.alibaba.dubbo.rpc.protocol.dubbo.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.remoting.exchange.ResponseFuture;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.StaticContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.support.RpcUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Future;

/**
 * EventFilter
 */
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {

    protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);

    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
        
        // 这里主要处理回调逻辑,主要区分三个时间:oninvoke:调用前触发,onreturn:调用后触发 onthrow:出现异常情况时候触发
        fireInvokeCallback(invoker, invocation);
        
        // 需要在调用前配置好是否有返回值,已供invoker判断是否需要返回future.
        Result result = invoker.invoke(invocation);
        if (isAsync) {
            asyncCallback(invoker, invocation);
        } else {
            syncCallback(invoker, invocation, result);
        }
        return result;
    }
    
    private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
        if (result.hasException()) {
            fireThrowCallback(invoker, invocation, result.getException());
        } else {
            fireReturnCallback(invoker, invocation, result.getValue());
        }
    }
    /**
     * 同步异步的主要处理区别:
     * 1. 同步调用的话,事件触发是直接调用的,没有任何逻辑;
     * 2. 异步的话就是首先获取到调用产生的Future对象,然后复写Future的done()方法,
     *    将fireThrowCallback和fireReturnCallback逻辑引入即可。
     */
    private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
        Future<?> f = RpcContext.getContext().getFuture();
        if (f instanceof FutureAdapter) {
            ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
            future.setCallback(new ResponseCallback() {
                public void done(Object rpcResult) {
                    if (rpcResult == null) {
                        logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                        return;
                    }
                    ///must be rpcResult
                    if (!(rpcResult instanceof Result)) {
                        logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                        return;
                    }
                    Result result = (Result) rpcResult;
                    if (result.hasException()) {
                        fireThrowCallback(invoker, invocation, result.getException());
                    } else {
                        fireReturnCallback(invoker, invocation, result.getValue());
                    }
                }

                public void caught(Throwable exception) {
                    fireThrowCallback(invoker, invocation, exception);
                }
            });
        }
    }

    private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
        final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
        final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));

        if (onInvokeMethod == null && onInvokeInst == null) {
            return;
        }
        if (onInvokeMethod == null || onInvokeInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        // 由于JDK的安全检查耗时较多.所以通过setAccessible(true)的方式关闭安全检查就可以达到提升反射速度的目的
        if (!onInvokeMethod.isAccessible()) {
            onInvokeMethod.setAccessible(true);
        }
        // 从下面代码可以看出oninvoke的方法参数要与调用的方法参数一致
        Object[] params = invocation.getArguments();
        try {
            onInvokeMethod.invoke(onInvokeInst, params);
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }
    
    // fireReturnCallback的逻辑与fireThrowCallback基本一样,所以不用看了
    private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
        final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
        final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));

        //not set onreturn callback
        if (onReturnMethod == null && onReturnInst == null) {
            return;
        }

        if (onReturnMethod == null || onReturnInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onReturnMethod.isAccessible()) {
            onReturnMethod.setAccessible(true);
        }

        Object[] args = invocation.getArguments();
        Object[] params;
        Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
        if (rParaTypes.length > 1) {
            if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                params = new Object[2];
                params[0] = result;
                params[1] = args;
            } else {
                params = new Object[args.length + 1];
                params[0] = result;
                System.arraycopy(args, 0, params, 1, args.length);
            }
        } else {
            params = new Object[]{result};
        }
        try {
            onReturnMethod.invoke(onReturnInst, params);
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }

    // fireReturnCallback的逻辑与fireThrowCallback基本一样,所以不用看了
    private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
        final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
        final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));

        //onthrow callback not configured
        if (onthrowMethod == null && onthrowInst == null) {
            return;
        }
        if (onthrowMethod == null || onthrowInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onthrowMethod.isAccessible()) {
            onthrowMethod.setAccessible(true);
        }
        Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
        if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
            try {
                // 因为onthrow方法的参数第一个值必须为异常信息,所以这里需要构造参数列表
                Object[] args = invocation.getArguments();
                Object[] params;

                if (rParaTypes.length > 1) {
                    // 回调方法只有一个参数而且这个参数是数组(单独拎出来计算的好处是这样可以少复制一个数组)
                    if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                        params = new Object[2];
                        params[0] = exception;
                        params[1] = args;
                    } else {
                        // 回调方法有多于一个参数
                        params = new Object[args.length + 1];
                        params[0] = exception;
                        System.arraycopy(args, 0, params, 1, args.length);
                    }
                } else {
                    // 回调方法没有参数
                    params = new Object[]{exception};
                }
                onthrowMethod.invoke(onthrowInst, params);
            } catch (Throwable e) {
                logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
            }
        } else {
            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
        }
    }
}

3 Provider

3.1 ContextFilter

ContextFilter и ConsumerContextFilter используются в комбинации, я видел ConsumerContextFilter в предыдущем введении, давайте кратко рассмотрим ContextFilter, чтобы проверить логику, упомянутую выше.

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcInvocation;

import java.util.HashMap;
import java.util.Map;

/**
 * ContextInvokerFilter
 */
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Map<String, String> attachments = invocation.getAttachments();
        if (attachments != null) {
            // 隐式参数重剔除一些核心消息
            attachments = new HashMap<String, String>(attachments);
            attachments.remove(Constants.PATH_KEY);
            attachments.remove(Constants.GROUP_KEY);
            attachments.remove(Constants.VERSION_KEY);
            attachments.remove(Constants.DUBBO_VERSION_KEY);
            attachments.remove(Constants.TOKEN_KEY);
            attachments.remove(Constants.TIMEOUT_KEY);
            attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain.
        }
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
//                .setAttachments(attachments)  // merged from dubbox
                .setLocalAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());

        // mreged from dubbox
        // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
        if (attachments != null) {
            // 这里又重新将invocation和attachments信息设置到RpcContext,
            // 这里设置以后provider的代码就可以获取到consumer端传递的一些隐式参数了
            if (RpcContext.getContext().getAttachments() != null) {
                RpcContext.getContext().getAttachments().putAll(attachments);
            } else {
                RpcContext.getContext().setAttachments(attachments);
            }
        }

        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.removeContext();
        }
    }
}

3.2 EchoFilter

Эхо-тест в основном используется для определения нормальности службы (состояние сети)., Если вы просто определяете сетевую ситуацию, вам не нужно выполнять реальную бизнес-логику, поэтому вы можете проверить ее с помощью фильтра.

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;

/**
 * EchoInvokerFilter
 */
@Activate(group = Constants.PROVIDER, order = -110000)
public class EchoFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
        if (inv.getMethodName().equals(Constants.$ECHO) && inv.getArguments() != null && inv.getArguments().length == 1)
            return new RpcResult(inv.getArguments()[0]);
        return invoker.invoke(inv);
    }

}

3.3 ExecuteLimitFilter

Конкретная логика выполнения текущего лимита интерфейса сервера находится в ExecuteLimitFilter,Поскольку серверу не нужно учитывать логику ожидания повторных попыток, как только количество выполняемых в данный момент потоков превышает указанное число, он возвращается непосредственно к ошибке., поэтому логика реализации намного проще, чем ActiveLimitFilter.

package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;

import java.util.concurrent.Semaphore;

/**
 * ThreadLimitInvokerFilter
 */
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }
}

3.4 ExceptionFilter

В Dubbo есть собственный набор правил обработки исключений:

  1. еслипроверенное исключениебросается прямо;
  2. Если это непроверенное исключениеНо есть объявление на интерфейсе, также будет выброшено напрямую;
  3. Если класс исключения и класс интерфейса находятся в одном пакете jar, бросить прямо;
  4. еслисобственное исключение JDK, бросить прямо;
  5. еслиИсключение Даббо, бросить прямо;
  6. Остальные заворачиваются в RuntimeException, а затем выбрасываются (чтобы избежать проблемы, связанной с тем, что исключение не может быть десериализовано в клиенте).;
package com.alibaba.dubbo.rpc.filter;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcResult;
import com.alibaba.dubbo.rpc.service.GenericService;

import java.lang.reflect.Method;

/**
 * ExceptionInvokerFilter
 * <p>
 * Functions:
 * <ol>
 * <li>unexpected exception will be logged in ERROR level on provider side. Unexpected exception are unchecked
 * exception not declared on the interface</li>
 * <li>Wrap the exception not introduced in API package into RuntimeException. Framework will serialize the outer exception but stringnize its cause in order to avoid of possible serialization problem on client side</li>
 * </ol>
 */
@Activate(group = Constants.PROVIDER)
public class ExceptionFilter implements Filter {

    private final Logger logger;

    public ExceptionFilter() {
        this(LoggerFactory.getLogger(ExceptionFilter.class));
    }

    public ExceptionFilter(Logger logger) {
        this.logger = logger;
    }

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        try {
            Result result = invoker.invoke(invocation);
            if (result.hasException() && GenericService.class != invoker.getInterface()) {
                try {
                    Throwable exception = result.getException();

                    // directly throw if it's checked exception
                    if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
                        return result;
                    }
                    // directly throw if the exception appears in the signature
                    try {
                        Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                        Class<?>[] exceptionClassses = method.getExceptionTypes();
                        for (Class<?> exceptionClass : exceptionClassses) {
                            if (exception.getClass().equals(exceptionClass)) {
                                return result;
                            }
                        }
                    } catch (NoSuchMethodException e) {
                        return result;
                    }

                    // for the exception not found in method's signature, print ERROR message in server's log.
                    logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                            + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                            + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                    // directly throw if exception class and interface class are in the same jar file.
                    String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
                    String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                    if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                        return result;
                    }
                    // directly throw if it's JDK exception
                    String className = exception.getClass().getName();
                    if (className.startsWith("java.") || className.startsWith("javax.")) {
                        return result;
                    }
                    // directly throw if it's dubbo exception
                    if (exception instanceof RpcException) {
                        return result;
                    }

                    // otherwise, wrap with RuntimeException and throw back to the client
                    return new RpcResult(new RuntimeException(StringUtils.toString(exception)));
                } catch (Throwable e) {
                    logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                            + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                            + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                    return result;
                }
            }
            return result;
        } catch (RuntimeException e) {
            logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                    + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                    + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
            throw e;
        }
    }
}

На данный момент мы обсудили несколько основных фильтров в Dubbo.Фильтры на самом деле не такие уж сложные.В процессе разработки вы также можете обратиться к этой идее, чтобы реализовать свою собственную цепочку фильтров.