Прежде всего, поделитесь всеми предыдущими статьями, ставьте лайки, добавляйте в избранное и пересылайте три раза подряд. >>>>😜😜😜
Сборник статей:🎁nuggets.capable/post/694164…
Github :👉github.com/black-ant
Резервное копирование CASE:👉git ee.com/ant black/wipe…
Введение
После прочтения последней статьи о приеме на стороне сервера в Dubbo 3.0, в этой статье мы рассмотрим цепочку фильтров Dubbo, которая также является очень важной частью всего процесса.
II. Начальная точка обработки
// 调用逻辑
C- ChannelEventRunnable # run : 调用的起点 , 此处接受到消息
C- ExchangeHandler # reply : 发起 Invoke 处理流程
C- FilterChainBuilder # invoke : 开启 Invoke Filter 链构建流程
2.1 Вызов процесса инициации
// 上一篇文章已经看过了 , 通过构建一个 ExchangeHandlerAdapter 发起 reply 调用
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
// 只保留核心逻辑
Invocation inv = (Invocation) message;
// 此处实际调用 FilterChainBuilder
Invoker<?> invoker = getInvoker(channel, inv);
//...........
Result result = invoker.invoke(inv);
}
}
2.2 Процесс построения цепочки фильтров
C- FilterChainBuilder
// 这里构建 Filter 链 , 发起调用操作
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
// filter 链的处理
asyncResult = filter.invoke(nextNode, invocation);
} catch (Exception e) {
// 省略 ,主要是进行 Listerner 的监听 , 又在玩异步
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
// 处理完成后 ,还是要通知监听器
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof FILTER.Listener) {
// 对 BaseFilter.Listener 接口进行处理
FILTER.Listener listener = (FILTER.Listener) filter;
if (t == null) {
listener.onResponse(r, originalInvoker, invocation);
} else {
listener.onError(t, originalInvoker, invocation);
}
}
});
}
2.3 Структура вызова цепочки Filter
// 来看一下 FilterChainBuilder , 他的核心对象是其中的一个内部类
class FilterChainNode<T, TYPE extends Invoker<T>, FILTER extends BaseFilter> implements Invoker<T>{
// 源 Invoke 类型
TYPE originalInvoker;
// 下一个 invoke 节点
Invoker<T> nextNode;
// 当前的 filter 对象
FILTER filter;
}
// 下面看一下Filter 的构建的地方 -> DefaultFilterChainBuilder
@Override
public <T> Invoker<T> buildInvokerChain(final Invoker<T> originalInvoker, String key, String group) {
Invoker<T> last = originalInvoker;
// 加载外部 Extension
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(originalInvoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
// 可以看到 , 这里循环创建了 FilterChainNode , 这里是责任链模式
last = new FilterChainNode<>(originalInvoker, next, filter);
}
}
return last;
}
// 这里是SPI 的加载方式 , 对应的 SPI 类为
dubbo-rpc-api/META-INF.dubbo.internal -> org.apache.dubbo.rpc.Filter
// 其中包含如下内容
echo=org.apache.dubbo.rpc.filter.EchoFilter
generic=org.apache.dubbo.rpc.filter.GenericFilter
genericimpl=org.apache.dubbo.rpc.filter.GenericImplFilter
token=org.apache.dubbo.rpc.filter.TokenFilter
accesslog=org.apache.dubbo.rpc.filter.AccessLogFilter
classloader=org.apache.dubbo.rpc.filter.ClassLoaderFilter
context=org.apache.dubbo.rpc.filter.ContextFilter
exception=org.apache.dubbo.rpc.filter.ExceptionFilter
executelimit=org.apache.dubbo.rpc.filter.ExecuteLimitFilter
deprecated=org.apache.dubbo.rpc.filter.DeprecatedFilter
compatible=org.apache.dubbo.rpc.filter.CompatibleFilter
timeout=org.apache.dubbo.rpc.filter.TimeoutFilter
tps=org.apache.dubbo.rpc.filter.TpsLimitFilter
// PS : Dubbo SPI 的加载方式类似 , 以后有时间再仔细看
SPI может посмотреть @Наггетс.Талант/пост/698945…
2.4 Разница между Dubbo 3.0 и Dubbo 2.0
Самая большая разница между ними заключается в институциональном изменении Filter,
// Step 1 : Filter 结构
@SPI
public interface Filter extends BaseFilter {
}
// Step 2 : BaseFilter 结构
public interface BaseFilter {
// 始终调用实现中的invoke.invoke(),将请求移交给下一个筛选器节点
Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
interface Listener {
// 当调用结束后 ,根据情况选择是否调用
void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation);
void onError(Throwable t, Invoker<?> invoker, Invocation invocation);
}
}
// 这里接口里面套了一个接口 , 虽然知道可以这么用 ,但是从来没做过 , 原来还能这么用
至于实现的时候 , 和内部类使用方式一样 :
public class ExceptionFilter implements Filter, Filter.Listener
3. Цепочка фильтров
Архитектура фильтра
Список основных фильтров выглядит следующим образом:
- EchoFilter
- ClassLoaderFilter
- GenericFilter
- ContextFilter
- ExceptionFilter
- MonitorFilter
- TimeoutFilter
- TraceFilter
3.1 EchoFilter
ECHO означает эхо, функция этого класса в основном состоит в том, чтобы определить, доступна ли услуга.
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// String $ECHO = "$echo";
if (inv.getMethodName().equals($ECHO) && inv.getArguments() != null && inv.getArguments().length == 1) {
// 这里直接构建了一个异步 Result 返回 , 回声服务只为了检测服务是否可用
return AsyncRpcResult.newDefaultAsyncResult(inv.getArguments()[0], inv);
}
return invoker.invoke(inv);
}
3.2 ClassLoaderFilter
Фильтр в основном предназначен для вторичной обработки ClassLoader.
// 将当前执行线程类 ClassLoader 设置为服务接口的类 ClassLoader
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
ClassLoader ocl = Thread.currentThread().getContextClassLoader();
// 对 ClassLoader 进行切换
Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader());
try {
return invoker.invoke(invocation);
} finally {
// 这里重新放入最初的 ClassLoad
Thread.currentThread().setContextClassLoader(ocl);
}
}
// PS : 这里进行切换的原因猜测应该和双亲委派有关 , 整个体系的 classLoader 不能处理外部 jar 加载的类
// 当出现这种多 ClassLoader 时, 又因为双亲委派的机制 , 父加载类没有加载成功 , 又最终的 classLoader 加载了 , 这里就需要进行相关的切换
3.3 GenericFilter
GenericFilter также реализует два интерфейса: Filter, Filter.Listener, что означает, что будет обработка Response,
TODO: Основная цель класса GenericFilter не выяснена, и этот класс слишком длинный, я открою отдельную главу после того, как разберусь.
GenericFilter implements Filter, Filter.Listener
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// String $INVOKE = "$invoke";
// String $INVOKE_ASYNC = "$invokeAsync";
if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
//................
}
return invoker.invoke(inv);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation inv) {
if ((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC))
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
//................
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
}
3.4 ContextFilter
ContextFilter в основном обрабатывает контекст, а контекст хранит информацию об окружающей среде, необходимую для текущего вызывающего процесса.
static {
UNLOADING_KEYS = new HashSet<>(128);
UNLOADING_KEYS.add(PATH_KEY);
UNLOADING_KEYS.add(INTERFACE_KEY);
UNLOADING_KEYS.add(GROUP_KEY);
UNLOADING_KEYS.add(VERSION_KEY);
UNLOADING_KEYS.add(DUBBO_VERSION_KEY);
UNLOADING_KEYS.add(TOKEN_KEY);
UNLOADING_KEYS.add(TIMEOUT_KEY);
UNLOADING_KEYS.add(TIMEOUT_ATTACHMENT_KEY);
// 删除async属性以避免传递给以下调用链
UNLOADING_KEYS.add(ASYNC_KEY);
UNLOADING_KEYS.add(TAG_KEY);
UNLOADING_KEYS.add(FORCE_USE_TAG);
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Map<String, Object> attachments = invocation.getObjectAttachments();
if (attachments != null) {
Map<String, Object> newAttach = new HashMap<>(attachments.size());
// attachments 中主要为请求的元数据信息
// {"input":"265","dubbo":"2.0.2","path":"org.apache.dubbo.metadata.MetadataService","version":"1.0.0","group":"dubbo-demo-annotation-provider"}
for (Map.Entry<String, Object> entry : attachments.entrySet()) {
String key = entry.getKey();
// ["path","_TO","group","dubbo.tag","version","dubbo.force.tag","dubbo","interface","timeout","token","async"]
if (!UNLOADING_KEYS.contains(key)) {
// 收集在排除之外的属性
newAttach.put(key, entry.getValue());
}
}
attachments = newAttach;
}
// 设置 invoke 到 ServiceContext5 中
RpcContext.getServiceContext().setInvoker(invoker)
.setInvocation(invocation);
RpcContext context = RpcContext.getServerAttachment();
context.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
String remoteApplication = (String) invocation.getAttachment(REMOTE_APPLICATION_KEY);
if (StringUtils.isNotEmpty(remoteApplication)) {
RpcContext.getServiceContext().setRemoteApplicationName(remoteApplication);
} else {
RpcContext.getServiceContext().setRemoteApplicationName((String) context.getAttachment(REMOTE_APPLICATION_KEY));
}
// 调用超时时间 , 未配置为 -1
long timeout = RpcUtils.getTimeout(invocation, -1);
if (timeout != -1) {
// pass to next hop
RpcContext.getClientAttachment().setObjectAttachment(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));
}
// 可能已经在这个过滤器之前添加了一些附件到RpcContext
if (attachments != null) {
if (context.getObjectAttachments() != null) {
// 此处会报前文的额外属性进行设置
context.getObjectAttachments().putAll(attachments);
} else {
context.setObjectAttachments(attachments);
}
}
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
RpcContext.removeServerAttachment();
RpcContext.removeServiceContext();
// 对于异步场景,我们必须从当前线程中删除上下文,因此我们总是为同一线程的下一次调用创建一个新的RpcContext
RpcContext.getClientAttachment().removeAttachment(TIME_COUNTDOWN_KEY);
RpcContext.removeServerContext();
}
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// 将附件传递到结果
appResponse.addObjectAttachments(RpcContext.getServerContext().getObjectAttachments());
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
}
3.5 ExceptionFilter
ExceptionFilter в основном имеет дело с исключениями, возвращаемыми ответом.
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
if (appResponse.hasException() && GenericService.class != invoker.getInterface()) {
try {
Throwable exception = appResponse.getException();
// 如果它被检查异常,直接抛出
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {
return;
}
// 如果异常出现在签名中,直接抛出
try {
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClasses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClasses) {
if (exception.getClass().equals(exceptionClass)) {
return;
}
}
} catch (NoSuchMethodException e) {
return;
}
// .. 省略 : 对于在方法签名中未找到的异常,在服务器日志中打印ERROR消息
// 如果异常类和接口类在同一个jar文件中,则直接抛出
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return;
}
// 如果是JDK异常,直接抛出
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {
return;
}
// 如果是 Dubbo 异常,直接抛出
if (exception instanceof RpcException) {
return;
}
// 否则,用RuntimeException包装并返回给客户机
appResponse.setException(new RuntimeException(StringUtils.toString(exception)));
} catch (Throwable e) {
}
}
}
3.6 MonitorFilter
Monitorfilter используется для мониторинга операции, он позвонит перехватку и будет собирать данные вызова об этом вызове и отправить его в центр мониторинга
// Invoke 流程
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
// private static final String MONITOR_FILTER_START_TIME = "monitor_filter_start_time";
// private static final String MONITOR_REMOTE_HOST_STORE = "monitor_remote_host_store";
invocation.put(MONITOR_FILTER_START_TIME, System.currentTimeMillis());
invocation.put(MONITOR_REMOTE_HOST_STORE, RpcContext.getServiceContext().getRemoteHost());
getConcurrent(invoker, invocation).incrementAndGet(); // count up
}
return invoker.invoke(invocation); // proceed invocation chain
}
// 主要的 Monitor 流程是以下流程
@Override
public void onResponse(Result result, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
collect(invoker, invocation, result,
(String) invocation.get(MONITOR_REMOTE_HOST_STORE),
(long) invocation.get(MONITOR_FILTER_START_TIME), false);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (invoker.getUrl().hasAttribute(MONITOR_KEY)) {
collect(invoker, invocation, null, (String) invocation.get(MONITOR_REMOTE_HOST_STORE), (long) invocation.get(MONITOR_FILTER_START_TIME), true);
getConcurrent(invoker, invocation).decrementAndGet(); // count down
}
}
private void collect(Invoker<?> invoker, Invocation invocation, Result result, String remoteHost, long start, boolean error) {
try {
Object monitorUrl;
// 获取上文构建的 MONITOR_KEY
monitorUrl = invoker.getUrl().getAttribute(MONITOR_KEY);
if(monitorUrl instanceof URL) {
//
Monitor monitor = monitorFactory.getMonitor((URL) monitorUrl);
if (monitor == null) {
return;
}
// 创建数据的url
URL statisticsURL = createStatisticsUrl(invoker, invocation, result, remoteHost, start, error);
// 收集监控数据
monitor.collect(statisticsURL.toSerializableURL());
}
} catch (Throwable t) {
logger.warn("Failed to monitor count service " + invoker.getUrl() + ", cause: " + t.getMessage(), t);
}
}
// PS : 这里涉及到 MonitorService 对象 , 这里先不深入 ,只是过一下流程
C- MonitorService
- void collect(URL statistics) : 收集监测数据
- List<URL> lookup(URL query)
3.7 TimeoutFilter
Обработка тайм-аутов
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// String TIME_COUNTDOWN_KEY = "timeout-countdown";
Object obj = RpcContext.getClientAttachment().getObjectAttachment(TIME_COUNTDOWN_KEY);
if (obj != null) {
TimeoutCountDown countDown = (TimeoutCountDown) obj;
//
if (countDown.isExpired()) {
// // 在超时的情况下清除响应
((AppResponse) appResponse).clear();
// 这里仅仅是打印一个 log
if (logger.isWarnEnabled()) {
logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
", invoke elapsed " + countDown.elapsedMillis() + " ms.");
}
}
}
}
3.8 TraceFilter
Этот фильтр в основном отслеживает запросы. В этом объекте есть коллекция для ведения списка ключей и каналов TRACES. После завершения вызова журнала журнал будет отправлен на соответствующий канал.
В основном для прослушивания любого метода интерфейса или метода n раз
// 这里涉及到一个主要的集合
private static final ConcurrentMap<String, Set<Channel>> TRACERS = new ConcurrentHashMap<>();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 常见的获取前后调用时间的方式
long start = System.currentTimeMillis();
Result result = invoker.invoke(invocation);
long end = System.currentTimeMillis();
// 如果 trace 存在
if (TRACERS.size() > 0) {
// 构建 trace key , 为代理接口.代理方法
String key = invoker.getInterface().getName() + "." + invocation.getMethodName();
Set<Channel> channels = TRACERS.get(key);
if (channels == null || channels.isEmpty()) {
key = invoker.getInterface().getName();
channels = TRACERS.get(key);
}
// 从集合中获取对应的 channel 集合
if (CollectionUtils.isNotEmpty(channels)) {
for (Channel channel : new ArrayList<>(channels)) {
// 如果 channel 为连接或者出现异常 , 都会从集合中移除该 channel
if (channel.isConnected()) {
try {
int max = 1;
Integer m = (Integer) channel.getAttribute(TRACE_MAX);
if (m != null) {
max = m;
}
int count = 0;
AtomicInteger c = (AtomicInteger) channel.getAttribute(TRACE_COUNT);
if (c == null) {
c = new AtomicInteger();
channel.setAttribute(TRACE_COUNT, c);
}
count = c.getAndIncrement();
// 此处涉及到2个重要的参数 , 他们分别表示监听的最大次数和当前监听的次数
// private static final String TRACE_MAX = "trace.max";
// private static final String TRACE_COUNT = "trace.count";
if (count < max) {
String prompt = channel.getUrl().getParameter(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT);
channel.send("\r\n" + RpcContext.getServiceContext().getRemoteAddress() + " -> "
+ invoker.getInterface().getName()
+ "." + invocation.getMethodName()
+ "(" + JSON.toJSONString(invocation.getArguments()) + ")" + " -> " + JSON.toJSONString(result.getValue())
+ "\r\nelapsed: " + (end - start) + " ms."
+ "\r\n\r\n" + prompt);
}
if (count >= max - 1) {
channels.remove(channel);
}
} catch (Throwable e) {
channels.remove(channel);
}
} else {
channels.remove(channel);
}
}
}
}
return result;
}
// PS : 此处补充一下 Trace 的添加流程 , TraceFilter 中提供了2个方法对 Trace Channel 进行管理
public static void addTracer(Class<?> type, String method, Channel channel, int max)
- String key = method != null && method.length() > 0 ? type.getName() + "." + method : type.getName();
?- 可用注意到 , tracer key j
public static void removeTracer(Class<?> type, String method, Channel channel)
// 而 Trace 主要的管理和处理类为 TraceTelnetHandler
public class TraceTelnetHandler implements TelnetHandler {
}
Суммировать
- Цепочки фильтров загружаются через SPI
- Цепочки фильтров создаются с помощью FilterChainBuilder.
- Получить атрибуты в фильтре через invocation.getObjectAttachments()
Эта статья не очень глубокая, в основном потому, что автор все еще учится на поверхности, а во-вторых, поскольку эта статья носит общий характер, я буду постепенно углубляться в Dubbo 3.0.