Разговор о Dubbo (7): Практика создания пользовательских фильтров

задняя часть сервер Алибаба Dubbo

0 Предисловие

При нынешнем тренде микросервисов в процессе одного вызова задействовано несколько сервисных нод, а сгенерированные логи распределяются по разным серверам.Хотя технология ELK может использоваться для агрегации разбросанных логов в es, как эти логи могут запускаться через Up является ключевым вопросом.

Если вам нужно просмотреть полный журнал ссылок звонка, общий подход заключается в созданииtraceIdПередача в последующую цепочку вызова обслуживанияtraceId, последующая служба будет по-прежнему использоватьсяtraceIdРаспечатайте журнал и передайте его другим последующим службамtraceId, этот процесс сокращен,прозрачная передача traceId.

В системе, использующей протокол HTTP в качестве служебного протокола, инкапсулированный HTTP-клиент может использоваться для прозрачной передачи traceId. Но реализация прозрачной передачи traceId в dubbo немного сложнее. Согласно предыдущему разделу"☆Поговорим о Dubbo (6): основной принцип цепочки исходного кода-фильтра", в целом,Будет настроен фильтр для достижения прозрачной передачи traceId, но есть еще две специальные реализации:(1) Повторно реализовать связанные классы внутри dubbo (2) На основе реализации RpcContext;

1 Реализация на основе перезаписи

1.1 анализ исходного кода

Dubbo Consumer与Provider调用过程

ProxyDubbo Javassist используется для создания динамического конца экземпляра потребительского обслуживания прокси-продажи.

ImplementЭто экземпляр реализации службы на стороне провайдера.

Идентификатор трассировки передается прозрачно, т. е. прокси и агрегат должны иметь одинаковый идентификатор трассировки.. Даббо обладает хорошими характеристиками слоистости,Объект транспорта — RPCInvocation.

Следовательно, ключевая логическая реализация перезаписи заключается в том, что прокси помещает traceId в RPCInvocation и отправляет его клиенту для сериализации и передачи TCP.

Ниже приведен потребительский конецJavassistProxyFactoryАнализ кода для:

public class JavassistProxyFactory extends AbstractProxyFactory {

    /**
     * Spring容器启动时,该代理工厂类方法会为Consumer生成Service代理类
     * invoker和interfaces都是从Spring配置文件中读取出来 
     */
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        // 生成Service代理类的每个方法的字节码,都调用了InvokerInvocationHandler.invoke(...)方法,
        // 做实际RpcInvocation包装、序列化、TCP传输、反序列化结果
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

Ниже представлена ​​потребительская сторона.InvokerInvocationHandlerАнализ кода для:

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    /**
     * 真正调用RPC时,各个Service代理的字节码里调用了这个通用的invoke
     * proxy就是之前生成的代理对象,第二个参数是方法名,第三个参数是参数列表
     * 知道了(1)哪个接口(2)哪个方法(3)参数是什么,就完全可以映射到Provider端实现并获取返回值
     */
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 因为到这里,还是consumer端的业务线程,所以在这里取ThreadLocal里的traceId,
        // 再放入RpcInvocation的attachment,那么Provider就可以从收到的RpcInvocation实例取出透传的traceId
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

Ниже представлена ​​сторона провайдера.DubboProtocolАнализ кода для:

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }

                // Provider收到报文之后,从线程池中取出一个线程,反序列化出RpcInvocation、并调用实现类的对应方法
                // 所以,此处就是Provider端的实现类的线程,取出traceId,放入ThreadLocal中
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

1.2 Конкретная реализация

package com.alibaba.dubbo.rpc.proxy;

/**
 * traceId工具类这个类是新添加的
 */
public class TraceIdUtil {

    private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<String>();

    public static String getTraceId() {
        return TRACE_ID.get();
    }

    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }

}

/**
 * InvokerHandler 这个类 是修改的
 */
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 这里将cosumer 端的traceId放入RpcInvocation
        RpcInvocation rpcInvocation = new RpcInvocation(method, args);
        rpcInvocation.setAttachment("traceId", TraceIdUtil.getTraceId());
        return invoker.invoke(rpcInvocation).recreate();
    }

}


package com.alibaba.dubbo.rpc.protocol.dubbo;

/**
 * dubbo protocol support 重新实现DubboProtocol
 *
 */
public class DubboProtocol extends AbstractProtocol {


    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 这里将收到的consumer端的traceId放入provider端的thread local
                TraceIdUtil.setTraceId(inv.getAttachment("traceId"));
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
    }
}

2 внедрение на основе RPCContext

Прежде чем объяснять решение пользовательского фильтра для прозрачной передачи traceId, давайте сначала изучим объект RpcContext.Его RpcContext по существу представляет собой объект ThreadLocal, который поддерживает контекстную информацию взаимодействия rpc..

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.rpc;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.NetUtils;

/**
 * Thread local context. (API, ThreadLocal, ThreadSafe)
 * 
 * 注意:RpcContext是一个临时状态记录器,当接收到RPC请求,或发起RPC请求时,RpcContext的状态都会变化。
 * 比如:A调B,B再调C,则B机器上,在B调C之前,RpcContext记录的是A调B的信息,在B调C之后,RpcContext记录的是B调C的信息。
 * 
 * @see com.alibaba.dubbo.rpc.filter.ContextFilter
 * @author qian.lei
 * @author william.liangf
 * @export
 */
public class RpcContext {
	
	private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
		@Override
		protected RpcContext initialValue() {
			return new RpcContext();
		}
	};

	/**
	 * get context.
	 * 
	 * @return context
	 */
	public static RpcContext getContext() {
	    return LOCAL.get();
	}
	
	/**
	 * remove context.
	 * 
	 * @see com.alibaba.dubbo.rpc.filter.ContextFilter
	 */
	public static void removeContext() {
	    LOCAL.remove();
	}

    private Future<?> future;

    private List<URL> urls;

    private URL url;

    private String methodName;

    private Class<?>[] parameterTypes;

    private Object[] arguments;

	private InetSocketAddress localAddress;

	private InetSocketAddress remoteAddress;

    private final Map<String, String> attachments = new HashMap<String, String>();

    private final Map<String, Object> values = new HashMap<String, Object>();

    // now we don't use the 'values' map to hold these objects
    // we want these objects to be as generic as possible
    private Object request;
    private Object response;

	@Deprecated
    private List<Invoker<?>> invokers;
    
	@Deprecated
    private Invoker<?> invoker;

	@Deprecated
    private Invocation invocation;
    
	protected RpcContext() {
	}

    /**
     * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
     *
     * @return null if the underlying protocol doesn't provide support for getting request
     */
    public Object getRequest() {
        return request;
    }

    /**
     * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
     *
     * @return null if the underlying protocol doesn't provide support for getting request or the request is not of the specified type
     */
    @SuppressWarnings("unchecked")
    public <T> T getRequest(Class<T> clazz) {
        return (request != null && clazz.isAssignableFrom(request.getClass())) ? (T) request : null;
    }


    public void setRequest(Object request) {
        this.request = request;
    }

    /**
     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
     *
     * @return null if the underlying protocol doesn't provide support for getting response
     */
    public Object getResponse() {
        return response;
    }

    /**
     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
     *
     * @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type
     */
    @SuppressWarnings("unchecked")
    public <T> T getResponse(Class<T> clazz) {
        return (response != null && clazz.isAssignableFrom(response.getClass())) ? (T) response : null;
    }

    public void setResponse(Object response) {
        this.response = response;
    }

    /**
     * is provider side.
     * 
     * @return provider side.
     */
    public boolean isProviderSide() {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        return url.getPort() != address.getPort() || 
                ! NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
    }

    /**
     * is consumer side.
     * 
     * @return consumer side.
     */
    public boolean isConsumerSide() {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        return url.getPort() == address.getPort() && 
                NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
    }

    /**
     * get future.
     * 
     * @param <T>
     * @return future
     */
    @SuppressWarnings("unchecked")
    public <T> Future<T> getFuture() {
        return (Future<T>) future;
    }

    /**
     * set future.
     * 
     * @param future
     */
    public void setFuture(Future<?> future) {
        this.future = future;
    }

    public List<URL> getUrls() {
        return urls == null && url != null ? (List<URL>) Arrays.asList(url) : urls;
    }

    public void setUrls(List<URL> urls) {
        this.urls = urls;
    }

    public URL getUrl() {
        return url;
    }

    public void setUrl(URL url) {
        this.url = url;
    }

    /**
     * get method name.
     * 
     * @return method name.
     */
    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    /**
     * get parameter types.
     * 
     * @serial
     */
    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    /**
     * get arguments.
     * 
     * @return arguments.
     */
    public Object[] getArguments() {
        return arguments;
    }

    public void setArguments(Object[] arguments) {
        this.arguments = arguments;
    }

    /**
     * set local address.
     * 
     * @param address
     * @return context
     */
	public RpcContext setLocalAddress(InetSocketAddress address) {
	    this.localAddress = address;
	    return this;
	}

	/**
	 * set local address.
	 * 
	 * @param host
	 * @param port
	 * @return context
	 */
    public RpcContext setLocalAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.localAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

	/**
	 * get local address.
	 * 
	 * @return local address
	 */
	public InetSocketAddress getLocalAddress() {
		return localAddress;
	}

	public String getLocalAddressString() {
        return getLocalHost() + ":" + getLocalPort();
    }
    
	/**
	 * get local host name.
	 * 
	 * @return local host name
	 */
	public String getLocalHostName() {
		String host = localAddress == null ? null : localAddress.getHostName();
		if (host == null || host.length() == 0) {
		    return getLocalHost();
		}
		return host;
	}

    /**
     * set remote address.
     * 
     * @param address
     * @return context
     */
    public RpcContext setRemoteAddress(InetSocketAddress address) {
        this.remoteAddress = address;
        return this;
    }
    
    /**
     * set remote address.
     * 
     * @param host
     * @param port
     * @return context
     */
    public RpcContext setRemoteAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.remoteAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

	/**
	 * get remote address.
	 * 
	 * @return remote address
	 */
	public InetSocketAddress getRemoteAddress() {
		return remoteAddress;
	}
	
	/**
	 * get remote address string.
	 * 
	 * @return remote address string.
	 */
	public String getRemoteAddressString() {
	    return getRemoteHost() + ":" + getRemotePort();
	}
	
	/**
	 * get remote host name.
	 * 
	 * @return remote host name
	 */
	public String getRemoteHostName() {
		return remoteAddress == null ? null : remoteAddress.getHostName();
	}

    /**
     * get local host.
     * 
     * @return local host
     */
    public String getLocalHost() {
        String host = localAddress == null ? null : 
            localAddress.getAddress() == null ? localAddress.getHostName() 
                    : NetUtils.filterLocalHost(localAddress.getAddress().getHostAddress());
        if (host == null || host.length() == 0) {
            return NetUtils.getLocalHost();
        }
        return host;
    }

    /**
     * get local port.
     * 
     * @return port
     */
    public int getLocalPort() {
        return localAddress == null ? 0 : localAddress.getPort();
    }

    /**
     * get remote host.
     * 
     * @return remote host
     */
    public String getRemoteHost() {
        return remoteAddress == null ? null : 
            remoteAddress.getAddress() == null ? remoteAddress.getHostName() 
                    : NetUtils.filterLocalHost(remoteAddress.getAddress().getHostAddress());
    }

    /**
     * get remote port.
     * 
     * @return remote port
     */
    public int getRemotePort() {
        return remoteAddress == null ? 0 : remoteAddress.getPort();
    }

    /**
     * get attachment.
     * 
     * @param key
     * @return attachment
     */
    public String getAttachment(String key) {
        return attachments.get(key);
    }

    /**
     * set attachment.
     * 
     * @param key
     * @param value
     * @return context
     */
    public RpcContext setAttachment(String key, String value) {
        if (value == null) {
            attachments.remove(key);
        } else {
            attachments.put(key, value);
        }
        return this;
    }

    /**
     * remove attachment.
     * 
     * @param key
     * @return context
     */
    public RpcContext removeAttachment(String key) {
        attachments.remove(key);
        return this;
    }

    /**
     * get attachments.
     * 
     * @return attachments
     */
    public Map<String, String> getAttachments() {
        return attachments;
    }

    /**
     * set attachments
     * 
     * @param attachment
     * @return context
     */
    public RpcContext setAttachments(Map<String, String> attachment) {
        this.attachments.clear();
        if (attachment != null && attachment.size() > 0) {
            this.attachments.putAll(attachment);
        }
        return this;
    }
    
    public void clearAttachments() {
        this.attachments.clear();
    }

    /**
     * get values.
     * 
     * @return values
     */
    public Map<String, Object> get() {
        return values;
    }

    /**
     * set value.
     * 
     * @param key
     * @param value
     * @return context
     */
    public RpcContext set(String key, Object value) {
        if (value == null) {
            values.remove(key);
        } else {
            values.put(key, value);
        }
        return this;
    }

    /**
     * remove value.
     * 
     * @param key
     * @return value
     */
    public RpcContext remove(String key) {
        values.remove(key);
        return this;
    }

    /**
     * get value.
     * 
     * @param key
     * @return value
     */
    public Object get(String key) {
        return values.get(key);
    }

    public RpcContext setInvokers(List<Invoker<?>> invokers) {
        this.invokers = invokers;
        if (invokers != null && invokers.size() > 0) {
            List<URL> urls = new ArrayList<URL>(invokers.size());
            for (Invoker<?> invoker : invokers) {
                urls.add(invoker.getUrl());
            }
            setUrls(urls);
        }
        return this;
    }

    public RpcContext setInvoker(Invoker<?> invoker) {
        this.invoker = invoker;
        if (invoker != null) {
            setUrl(invoker.getUrl());
        }
        return this;
    }

    public RpcContext setInvocation(Invocation invocation) {
        this.invocation = invocation;
        if (invocation != null) {
            setMethodName(invocation.getMethodName());
            setParameterTypes(invocation.getParameterTypes());
            setArguments(invocation.getArguments());
        }
        return this;
    }

    /**
     * @deprecated Replace to isProviderSide()
     */
    @Deprecated
    public boolean isServerSide() {
        return isProviderSide();
    }
    
    /**
     * @deprecated Replace to isConsumerSide()
     */
    @Deprecated
    public boolean isClientSide() {
        return isConsumerSide();
    }
    
    /**
     * @deprecated Replace to getUrls()
     */
    @Deprecated
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public List<Invoker<?>> getInvokers() {
        return invokers == null && invoker != null ? (List)Arrays.asList(invoker) : invokers;
    }

    /**
     * @deprecated Replace to getUrl()
     */
    @Deprecated
    public Invoker<?> getInvoker() {
        return invoker;
    }

    /**
     * @deprecated Replace to getMethodName(), getParameterTypes(), getArguments()
     */
    @Deprecated
    public Invocation getInvocation() {
        return invocation;
    }
    
    /**
     * 异步调用 ,需要返回值,即使步调用Future.get方法,也会处理调用超时问题.
     * @param callable
     * @return 通过future.get()获取返回结果.
     */
    @SuppressWarnings("unchecked")
	public <T> Future<T> asyncCall(Callable<T> callable) {
    	try {
	    	try {
	    		setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
				final T o = callable.call();
				//local调用会直接返回结果.
				if (o != null) {
					FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
						public T call() throws Exception {
							return o;
						}
					});
					f.run();
					return f;
				} else {
					
				}
			} catch (Exception e) {
				throw new RpcException(e);
			} finally {
				removeAttachment(Constants.ASYNC_KEY);
			}
    	} catch (final RpcException e) {
			return new Future<T>() {
				public boolean cancel(boolean mayInterruptIfRunning) {
					return false;
				}
				public boolean isCancelled() {
					return false;
				}
				public boolean isDone() {
					return true;
				}
				public T get() throws InterruptedException, ExecutionException {
					throw new ExecutionException(e.getCause());
				}
				public T get(long timeout, TimeUnit unit)
						throws InterruptedException, ExecutionException,
						TimeoutException {
					return get();
				}
			};
		}
    	return ((Future<T>)getContext().getFuture());
    }
    
	/**
	 * oneway调用,只发送请求,不接收返回结果.
	 * @param callable
	 */
	public void asyncCall(Runnable runable) {
    	try {
    		setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString());
    		runable.run();
		} catch (Throwable e) {
			//FIXME 异常是否应该放在future中?
			throw new RpcException("oneway call error ." + e.getMessage(), e);
		} finally {
			removeAttachment(Constants.RETURN_KEY);
		}
    }
}

Примечание:Информация о вложениях в RpcContext будет заполнена в объекте RpcInvocation и передана вместе..

Поэтому некоторые люди предполагают, что traceId можно просто внедрить в RpcContext, чтобы можно было легко реализовать прозрачную передачу traceId., так ли это, давайте сначала попрактикуемся вместе.

Определите класс интерфейса Dubbo:

public interface IEchoService {
    String echo(String name);
}

Напишите серверный код (провайдер):

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        System.out.println("name = " + name + ", traceId = " + traceId);
        return name;
    }
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-producer.xml");
 
        System.out.println("server start");
        while (true) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    } 
}

Напишите код клиента (Consumer):

public class EchoServiceConsumer {
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-consumer.xml");
 
        IEchoService service = (IEchoService) applicationContext
                .getBean("echoService");
 
        // *) 设置traceId
        RpcContext.getContext().setAttachment("traceId", "100001");
        System.out.println(RpcContext.getContext().getAttachments());
        // *) 第一调用
        service.echo("lilei");
 
        // *) 第二次调用
        System.out.println(RpcContext.getContext().getAttachments());
        service.echo("hanmeimei");
    }
}

Результат выполнения следующий:

服务端输出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = null
 
客户端输出:
{traceId=100001}
{}

Из выходной информации сервера мы можем приятно удивиться, обнаружив, что traceId действительно передается, но только в первый раз, а не во второй раз. Вывод контента от клиента в RpcContext также подтверждает это явление, при этом существенная причина этого явления в том, чтоПриложение RPCContext Object В взаимодействии после очистки RPC.

Для метода clearAttachments RpcContext установите точку останова для воспроизведения Мы можем найти следующий стек вызовов:

java.lang.Thread.State: RUNNABLE
    at com.alibaba.dubbo.rpc.RpcContext.clearAttachments(RpcContext.java:438)
    at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50)
    at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:91)
    at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
    at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77)
    at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:227)
    at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
    at com.alibaba.dubbo.common.bytecode.proxy0.echo(proxy0.java:-1)
    at com.test.dubbo.EchoServiceConsumer.main(EchoServiceConsumer.java:20)

Самый прямой вызов — это ConsumerContextFilter, который поставляется с Dubbo, давайте проанализируем его код:

@Activate(
    group = {"consumer"},
    order = -10000
)
public class ConsumerContextFilter implements Filter {
    public ConsumerContextFilter() {
    }
 
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext().setInvoker(invoker).setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        if(invocation instanceof RpcInvocation) {
            ((RpcInvocation)invocation).setInvoker(invoker);
        }
 
        Result var3;
        try {
            var3 = invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
 
        return var3;
    }
}

Действительно, во фрагменте кода finallyМы обнаружили, что RpcContext очищает объект вложения после каждого вызова rpc..

Теперь, когда мы нашли основную причину, решение,Вы можете сбрасывать traceid при каждом вызове., например так (выглядит немного некрасиво в еде):

// *) 第一调用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
 
// *) 第二次调用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("hanmeimei");

3 Реализация на основе фильтра

Сначала введите класс инструмента:

public class TraceIdUtils {
 
    private static final ThreadLocal<String> traceIdCache
            = new ThreadLocal<String>();
 
    public static String getTraceId() {
        return traceIdCache.get();
    }
 
    public static void setTraceId(String traceId) {
        traceIdCache.set(traceId);
    }
 
    public static void clear() {
        traceIdCache.remove();
    }
 
}

Затем мы определяем класс фильтра:

package com.test.dubbo;
 
public class TraceIdFilter implements Filter {
 
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        if ( !StringUtils.isEmpty(traceId) ) {
            // *) 从RpcContext里获取traceId并保存
            TraceIdUtils.setTraceId(traceId);
        } else {
            // *) 交互前重新设置traceId, 避免信息丢失
            RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId());
        }
        // *) 实际的rpc调用
        return invoker.invoke(invocation);
    }
}

В каталоге ресурсов добавьте каталог META-INF/dubbo, а затем добавьте файл com.alibaba.dubbo.rpc.Filter:

META-INF/dubbo/com.alibaba.dubbo.rpc.Filter文件

Отредактируйте содержимое (файл com.alibaba.dubbo.rpc.Filter) следующим образом:

traceIdFilter=com.test.dubbo.TraceIdFilter

Затем мы настраиваем соответствующие элементы фильтра как для производителя, так и для потребителя dubbo:

服务端:
<dubbo:service interface="com.test.dubbo.IEchoService" ref="echoService" version="1.0.0"
        filter="traceIdFilter"/>

客户端:
<dubbo:reference interface="com.test.dubbo.IEchoService" id="echoService" version="1.0.0"
        filter="traceIdFilter"/>

Код теста на стороне сервера изменен на следующий:

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = TraceIdUtils.getTraceId();
        System.out.println("name = " + name + ", traceId = " + traceId);
        return name;
    }
 
}

Фрагмент тестового кода для клиента:

// *) 第一调用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
 
// *) 第二次调用
service.echo("hanmeimei");

Тот же код, результаты теста следующие:

服务端输出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = 100001
 
客户端输出:
{traceId=100001}
{}

В соответствии с ожиданиями, я чувствую, что эта схема очень элегантна.Вложение RpcContext по-прежнему пусто (ConsumerContextFilter выполняется после пользовательского фильтра)Но перед каждым взаимодействием RPC TraceID будет вновь введен в эксплуатацию для обеспечения успешного пропускания с помощью подсказки для отслеживания.