Говоря о процессе обнаружения регистрации Dubbo

задняя часть Dubbo
Говоря о процессе обнаружения регистрации Dubbo

Говоря о процессе регистрации и обнаружения вызовов Dubbo

предисловие

Автор всегда использовал Dubbo и Nacos в качестве основы для распределенного управления с тех пор, как он вступил в контакт с распределенной системой, и он никогда не изучал, как Dubbo выполняет регистрацию и предоставление услуг.Я давно хотел написать статью о Dubbo. У Найхе нет времени смотреть исходный код Dubbo, я изучил его за последние несколько дней, поэтому хотел бы поделиться здесь своим мнением, если у вас разные мнения, приглашаем к совместному обсуждению.

окрестности

  1. Код взят с гитхабавеб-сайт с открытым исходным кодом
  2. В этой статье используется ветка 2.6.x (3.0 слишком новая, чтобы ее читать, но общий протокол основан на версии 2.6-7, подробности см. на официальном сайте dubbo)
  3. openjdk8

Состав дуббо

Общий дизайн даббо

инструкция:

  • Синие — это интерфейсы, используемые потребителями услуг, зеленые — интерфейсы, используемые поставщиками услуг, а центральная ось — это интерфейсы или классы реализации, используемые обоими.
  • Общий дизайн Dubbo можно разделить на 10 уровней, каждый из которых зависит от одной стороны, и каждый раз верхний уровень можно отделить и использовать повторно.Служебный уровень и уровень конфигурации являются уровнем API, а остальные уровни СПИ. То есть расширение очень хорошее
  • Зеленые блоки на рисунке — это интерфейсы расширения, а синие блоки — это классы реализации На рисунке показаны только классы реализации, используемые для связывания каждого уровня.
  • Синий пунктир на рисунке — это процесс инициализации, то есть цепочка сборки при запуске, красная сплошная линия — процесс вызова метода, то есть цепочка настройки времени выполнения, а фиолетовая треугольная стрелка — наследование. следует рассматривать как тот же узел родительского класса, строка Текст выше вызывается методом.

Описание каждого слоя

  • уровень конфигурации конфигурации: Внешний интерфейс конфигурации, сосредоточенный на ServiceConfig и ReferenceConfig, вы можете напрямую инициализировать класс конфигурации или проанализировать конфигурацию с помощью Spring для создания класса конфигурации.
  • уровень прокси службы прокси: Прозрачный прокси-сервер интерфейса службы, генерирующий клиентскую заглушку и каркас службы на стороне сервера, сосредоточенный на ServiceProxy, а интерфейс расширения — ProxyFactory.
  • уровень реестра реестра: Инкапсулирует регистрацию и обнаружение адресов службы, сосредоточенных на URL-адресе службы, а интерфейсы расширения: RegistryFactory, Registry, RegistryService.
  • слой маршрутизации кластера: Инкапсулируйте маршрутизацию и балансировку нагрузки нескольких поставщиков и объедините реестр с Invoker в качестве центра и расширенными интерфейсами: Cluster, Directory, Router, LoadBalance.
  • слой мониторинга монитора: Мониторинг количества вызовов RPC и времени вызова со статистикой в ​​центре и расширенными интерфейсами: MonitorFactory, Monitor, MonitorService.
  • протокол удаленного вызова: инкапсулирует вызовы RPC, сосредоточенные на Invocation, Result, и расширяет интерфейс до Protocol, Invoker, Exporter.
  • уровень обмена информацией: инкапсулирует режим ответа на запрос, от синхронного до асинхронного, сосредоточенный на запросе и ответе, и расширенные интерфейсы: Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer.
  • транспортный уровень транспортной сети: Абстрактные мина и нетти — это унифицированные интерфейсы, сосредоточенные на сообщении, а расширенные интерфейсы — это канал, транспортер, клиент, сервер, кодек.
  • сериализовать уровень сериализации данных: Некоторые повторно используемые инструменты, расширенный интерфейс Serialization, ObjectInput, ObjectOutput, ThreadPool

Инструкции по субподряду модуля

  • общий логический модуль dubbo-common: включает класс Util и универсальную модель.
  • удаленный коммуникационный модуль dubbo-remoting: Эквивалент реализации протокола Dubbo, если RPC использует протокол RMI, вам не нужно использовать этот пакет. Этот модуль включает в себя транспортный уровень и уровень обмена в 10 уровнях, которые являются основными модулями для вызовов rpc.
  • модуль удаленного вызова dubbo-rpc: Абстрактные различные протоколы, а также динамические агенты включают только вызовы один на один и не заботятся об управлении кластером. Этот модуль содержит уровни протокола и прокси в 10 слоях. На этом уровне находятся как экспозиция интерфейса, так и генерация прокси.
  • кластерный модуль dubbo-cluster: Маскировка нескольких поставщиков услуг под одного поставщика, включая: балансировку нагрузки, отказоустойчивость, маршрутизацию и т. д. Список адресов кластера может быть настроен статически или выдан реестром. Этот модуль является кластерным слоем на 10-м уровне.
  • модуль реестра dubbo-register: метод кластеризации на основе адреса, выданного центром регистрации, и абстракции различных центров регистрации.
  • модуль мониторинга dubbo-monitor: Подсчитайте количество вызовов службы, время вызова и службы отслеживания цепочки вызовов. Этот слой является слоем монитора в 10-м слое.
  • модуль конфигурации dubbo-config: это внешний API Dubbo. Пользователи могут использовать Dubbo через Config, чтобы скрыть все детали Dubbo. Этот уровень включает в себя слой конфигурации в 10 слоях.
  • контейнерный модуль dubbo-контейнера: это автономный контейнер, который загружает Spring с помощью простого основного, поскольку службам обычно не нужны функции веб-контейнеров, таких как Tomcat/JBoss, и нет необходимости использовать веб-контейнер для загрузки служб.
  • dubbo-serialization: метод сериализации вызовов rpc (в настоящее время существует 5 fastjson, fst, hessian2, jdk, kryo) по умолчанию — hessian2, этот модуль является уровнем сериализации на 10-м уровне.

Пока что все исходные модули и слои дизайна Dubbo находятся во взаимно однозначном соответствии.
Из этой информации мы можем узнать, что на момент разработки Dubbo структура модулей все еще очень ясна.

Сведения о процессе инициализации запуска проекта (производитель и потребитель)

Сервис разбора

Основываясь на конфигурации META-INF/spring.handlers в dubbo.jar, Spring будет вызывать DubboNamespaceHandler при встрече с пространством имен dubbo. Все теги dubbo анализируются единообразно с помощью DubboBeanDefinitionParser, который анализирует теги XML в объекты Bean на основе однозначного сопоставления атрибутов. При инициализации ServiceConfig.export() или ReferenceConfig.get() объект компонента преобразуется в формат URL, а все свойства компонента преобразуются в параметры URL. Затем URL-адрес передается в точку расширения протокола, на основе адаптивного механизма точки расширения, в соответствии с заголовком протокола URL-адреса выполняется предоставление службы или ссылка на различные протоколы.

Выставить сервис

  1. Без реестра и прямого предоставления поставщика формат URL-адреса, проанализированного 1.ServiceConfig, будет следующим: dubbo://service-host/com.foo.FooService?version=1.0.0. На основе адаптивного механизма точки расширения через заголовок протокола dubbo:// URL-адреса метод export() протокола DubboProtocol вызывается непосредственно для открытия порта службы.

  2. При наличии реестра и необходимости регистрации адреса провайдера 2 формат URL-адреса, анализируемого ServiceConfig, имеет следующий вид: dubbo ://service-host/com.foo.FooService?version=1.0.0"), основанный на адаптивном механизме точки расширения и идентифицируемый по заголовку протокола URL в реестре://, метод export() RegistryProtocol будет вызываться для преобразования URL-адрес поставщика в параметре экспорта сначала регистрируется в реестре. Затем повторно передайте его в точку расширения протокола для раскрытия: dubbo://service-host/com.foo.FooService?version=1.0.0, а затем на основе механизма адаптации точки расширения он идентифицируется dubbo: // заголовок протокола URL-адреса провайдера, будет вызван метод export() DubboProtocol для открытия порта сервиса.

Служба цитирования

  1. В случае отсутствия реестра и прямого подключения к провайдеру 3 формат URL-адреса, проанализированного ReferenceConfig, будет следующим: dubbo://service-host/com.foo.FooService?version=1.0.0. На основе механизма адаптации точки расширения, посредством идентификации заголовка протокола dubbo:// URL-адреса, метод refer() DubboProtocol вызывается напрямую для возврата ссылки на провайдера.

  2. Откройте для себя службы цитирования из реестра:

Когда есть реестр и адрес провайдера найден через реестр4, формат URL-адреса, проанализированного ReferenceConfig, имеет следующий вид: потребитель://потребитель-хост/com.foo.FooService?версия=1.0.0"). На основе адаптивного механизма точки расширения через заголовок протокола URL-адреса Registry:// будет вызываться метод refer() протокола RegistryProtocol, и URL-адрес поставщика будет запрашиваться на основе условий в параметре refer, таких как : dubbo://service-host/ com.foo.FooService?version=1.0.0. На основе механизма адаптации точки расширения через заголовок протокола dubbo:// URL-адреса провайдера будет вызываться метод refer() протокола DubboProtocol для получения ссылки на провайдера. Затем RegistryProtocol возвращает ссылки на несколько поставщиков через точку расширения Cluster, замаскированные под ссылку на один поставщик.

Сводка сведений об инициализации запуска проекта

Независимо от производителя или потребителя, есть реестр или нет, интерфейс протокола неотделим. То есть, начиная с производителя и потребителя, у него будет протокол, а когда реестра нет, то верхний слой можно удалить и использовать повторно, как описано выше, а для разоблачения и использования будет использоваться класс реализации DubboProtocol. справочные службы. . Когда есть реестр, согласно адаптивному механизму Dubbo, он станет RegistryProtocol как класс реализации для предоставления и ссылки на сервисы. Следовательно, обвинение протокола заключается в том, чтобы выставлять и ссылаться на сервисы, а его конкретная реализация зависит от среды, в которой используется Dubbo.
Как показано на рисунке:

    Protocol.java
    /**
     * 用于远程调用的暴露接口,也就是Invoker转化成Exporter的接口
     * 1.协议在收到请求后要记录请求源地址RpcContext.getContext().setRemoteAddress();
     * 2.export()必须是幂等的,即导出同一个URL时调用一次和调用两次没有区别
     * 3.Invoker 实例是框架传入的,协议不用管
     * @param <T>     Service type 接口类型
     * @param invoker Service invoker 接口类型转成URL再转换成的Invoker
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用远程服务
     * 1.当用户调用从`refer()`调用返回的`Invoker`对象的`invoke()`方法时,协议需要对应执行`Invoker`对象的`invoke()`方法 
     * 2. 协议的责任是实现从 `refer()` 返回的 `Invoker`。 一般来说,协议在 `Invoker` 实现中发送远程请求。 
     * 3、当URL中设置了check=false时,实现一定不要抛出异常,而是在连接失败时尝试恢复。
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

Процесс, с помощью которого производитель предоставляет услугу

Как показано выше: Сначала класс ServiceConfig получает фактическую ссылку на класс, которая предоставляет услуги внешнему миру (например, HelloWorldImpl), а затем использует ссылку для создания экземпляра AbstractProxyInvoker с помощью метода getInvoker класса ProxyFactory. обслуживание Invoker завершено. Далее идет процесс преобразования Invoker в Exporter. Ключом к работе Dubbo с предоставлением услуг является процесс преобразования Invoker в Exporter, красная часть на рисунке выше. Ниже мы иллюстрируем реализацию двух типичных протоколов, Dubbo и RMI: Реализация Dubbo Invoker протокола Dubbo преобразуется в Exporter в методе экспорта класса DubboProtocol.Он в основном открывает службу прослушивания сокетов и получает различные запросы от клиента.Детали связи реализованы самим Dubbo.

Процесс, посредством которого потребитель ссылается на услугу

Как показано выше: Во-первых, метод init класса ReferenceConfig вызывает метод refer протокола для создания экземпляра Invoker (как показано в красной части на рисунке выше), который является ключом к потреблению службы. Затем преобразуйте Invoker в интерфейс, требуемый клиентом (например, HelloWorld). Для каждого протокола, такого как RMI/Dubbo/веб-служба, детали вызова метода refer для создания экземпляра Invoker аналогичны таковым для производителя, но шаги отличаются.

@SPI("javassist")
public interface ProxyFactory {

    /**
     * 创建代理
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /**
     * 创建代理
     *
     * @param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    /**
     * 把代理转化成Invoker
     *
     * @param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}

Из рисунка 1 видно, что интерфейс ProxyFactory использует стандартныйJavassistProxyFactory.javaКласс реализации, и этот интерфейс может использоваться как производителями, так и потребителями, но также может знатьProxyFactory::getProxyметод применим к потребителям, в то время какProxyFactory::getInvokerПрименимо к производителям.Говорят, что и производители, и потребители будут использовать Invoker, а этот метод getInvoker используется только производителями. На следующем рисунке ясно показано, почему Invokers есть как у производителей, так и у потребителей.

Анализ исходного кода сервера

Этап инициализации проекта

Процесс экспорта службы Dubbo начинается, когда контейнер Spring публикует событие обновления.Dubbo сразу же выполняет логику экспорта службы после получения события. Метод входа, экспортируемый службой, — это onApplicationEvent компонента ServiceBean. onApplicationEvent — это метод ответа на событие, который выполняет операцию экспорта службы при получении события обновления контекста Spring. Код метода следующий: в разных версиях могут быть разные записи: исходная ветка 2.6.x — это запись в onApplicationEvent, а 2.7.8 интегрирует экспозицию в Springboot.OneTimeExecutionApplicationContextEventListener.javaпосле прослушиванияDubboBootstrap.javaизstartМетод, конечно, использует ветку 2.6.x ниже

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
        ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware,
        ApplicationEventPublisherAware {

    private transient volatile boolean exported;

    private transient volatile boolean unexported;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 是否有延迟导出(延迟false) && 是否已导出 && 是不是已被取消导出
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

    private boolean isDelay() {
        Integer delay = getDelay();
        ProviderConfig provider = getProvider();
        if (delay == null && provider != null) {
            delay = provider.getDelay();
        }
        return supportedApplicationListener && (delay == null || delay == -1);
    }

    public synchronized void export() {
        // 当前类继承ServiceConfig.java所以会看当前
        if (provider != null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) {
                delay = provider.getDelay();
            }
        }
        if (export != null && !export) {
            return;
        }

        if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                @Override
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }

    /**
     * 真正的出操作
     */
    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
        // 检测 interfaceName 是否合法
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
        }
        // 检测 provider 是否为空,为空则新建一个,并通过系统变量为其初始化
        checkDefault();
        if (provider != null) {
            ...
        }
        if (module != null) {
            ...
        }
        if (application != null) {
            ...
        }
        // 检测 ref 是否为泛化服务类型
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } 
        // ref 非 GenericService 类型
        else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // 对 interfaceClass,以及 <dubbo:method> 标签中的必要字段进行检查
            checkInterfaceAndMethods(interfaceClass, methods);
            // 对 ref 合法性进行检测
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        // local 和 stub 在功能应该是一致的,用于配置本地存根
        if (local != null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local";
            }
            Class<?> localClass;
            try {
                // 获取本地存根类
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // 检测本地存根类是否可赋值给接口类,若不可赋值则会抛出异常,提醒使用者本地存根类类型不合法
            if (!interfaceClass.isAssignableFrom(localClass)) {
                throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
            }
        }
        if (stub != null) {
            ...
        }
        // 检测各种对象是否为空,为空则新建,或者抛出异常
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStub(interfaceClass);
        checkMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        // 导出服务
        doExportUrls();
        CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());
        // ProviderModel 表示服务提供者模型,此对象中存储了与服务提供者相关的信息。
        // 比如服务的配置信息,服务实例等。每个被导出的服务对应一个 ProviderModel。
        // ApplicationModel 持有所有的 ProviderModel。
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    }

    /**
     * 多协议多注册中心导出服务
     * 把当前对象转换成URL.java
     */
    private void doExportUrls() {
        // 加载注册中心链接
        List<URL> registryURLs = loadRegistries(true);
        // 遍历 protocols,并在每个协议下导出服务
        for (ProtocolConfig protocolConfig : protocols) {
            // 组装URL
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }


    /**
     * 把URL转成Invoker伪代码
     */
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        ...

        //methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
        if (methods != null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
                //执行method的所有操作
                ...
            } // end of methods for
        }

        // 类似刚刚的泛型判断
        if (ProtocolUtils.isGeneric(generic)) {
            ...
        } else {
            ...
        }
        if (!ConfigUtils.isEmpty(token)) {
            if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else {
                map.put(Constants.TOKEN_KEY, token);
            }
        }
        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify", "false");
        }
        // 暴露服务也就是URL转换成Invoker
        ...
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // 如果 scope = none,则什么都不做
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            // scope != remote,导出到本地
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // scope != local,导出到远程
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                // 有注册中心
                if (registryURLs != null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        ...
                        // 为服务提供类(ref)生成 Invoker
                        // 刚刚说了生产者的ProxyFactory获取Invoker
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 拿到Invoker后暴露出去默认DubboProtocol
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } 
                // 不存在注册中心,仅导出服务
                else {

                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
        this.urls.add(url);
    }


    /**
     * DubboProtocol.java的export方法
     */
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
}
  • На самом деле вышеприведенный интерфейс сканируется Spring с момента запуска проекта.Преобразование интерфейса в URLЧастью является загрузка URL-адресов в ServiceConfig. Когда URL-адрес загружается, ref (фактический класс реализации) преобразуется в Invoker, а интерфейс открывается через DubboProtocol после преобразования в Invoker. На этом производитель всего процесса запуска завершен.

Анализ исходного кода потребителя

Потребители и производители на самом деле похожи, но порядок изменился: производитель получает Invoker через ref и затем предоставляет его через протокол, а потребитель

    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        // 检测 ref 是否为空,为空则通过 init 方法创建
        if (ref == null) {
            // init 方法主要用于处理配置,以及调用 createProxy 生成代理类
            init();
        }
        return ref;
    }

    private void init() {
        // 避免重复初始化
        if (initialized) {
            return;
        }
        
        // ...省略大量代码粗略总结下
        // 1.主要用于检测 ConsumerConfig 实例是否存在
        // 2.这段逻辑用于从系统属性或配置文件中加载与接口名相对应的配置,并将解析结果赋值给 url 字段。url 字段的作用一般是用于点对点调用。
        String resolve = System.getProperty(interfaceName);
        // 3.用于从系统属性或配置文件中加载与接口名相对应的配置,并将解析结果赋值给 url 字段。url 字段的作用一般是用于点对点调用。
        // 4.用于检测几个核心配置类是否为空,为空则尝试从其他配置类中获取。
        // 5.主要用于收集各种配置,并将配置存储到 map 中。
        // 6.用于处理 MethodConfig 实例。该实例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。

        // 获取服务消费者 ip 地址
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

        //attributes are stored by system context.
        StaticContext.getSystemContext().putAll(attributes);
        // 创建代理(重点)
        ref = createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
        // 这一段代码是判断是否当前类在本地,也就是所谓的本地调用
        if (isInjvm() == null) {
            // url 配置被指定,则不做本地引用
            if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } 
            // 根据 url 的协议、scope 以及 injvm 等参数检测是否需要本地引用
            // 比如如果用户显式配置了 scope=local,此时 isInjvmRefer 返回 true
            else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            // 获取 injvm 配置值
            isJvmRefer = isInjvm().booleanValue();
        }
        // 本地引用
        if (isJvmRefer) {
            // 生成本地引用 URL,协议为 injvm
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // 调用 refer 方法构建 InjvmInvoker 实例
            // 本地引用的时候refprotocol为DubboProtocol
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } 
        // 远程调用
        else {
            // url不为空说明消费者想点对点调用,此url可以再DubboReference的url属性设置
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        // 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } 
            // url为空说明用户连接的是注册中心
            else { // assemble URL from register center's configuration
                // 加载注册中心 url
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // 添加 refer 参数到 url 中,并将 url 添加到 urls 中
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                // 未配置注册中心,抛出异常
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }
            // 只有一个注册中心
            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                //多个注册中心
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                // 最后会把多个invokers合并成一个,也就是通过cluster层把多个invokers合并成一个,然后具体走哪个就看消费者设置的策略
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null && consumer != null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        // 生产者检查是否可用 不可用直接报错程序启动不了使用dubbo.consumer.check=false去除
        if (c && !invoker.isAvailable()) {
            ...

        // create service proxy
        // 创建代理
        return (T) proxyFactory.getProxy(invoker);
    }

Приведенный выше поток кода примерно предназначен для проверки класса конфигурации и различной системной информации в Map с момента инициализации, а затем для получения информации о потребителе.После завершения этих предварительных операций важноrefprotocol.referиproxyFactory.getProxyДавайте объясним, что делают refer и getProxy.

Потребитель создает Invoker

  • В потребителе Invoker создается из только что упомянутого протоколаметод ссылки, а протокол включает DubboProtocol и RegistryProtocol, что означает, что Invoker создается с реестром или без него.

DubboProtocol

public class DubboProtocol extends AbstractProtocol {
    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    /**
     * getClients。这个方法用于获取客户端实例
     */
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else {
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
}

RegistryProtocol

public class RegistryProtocol implements Protocol {
    
    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 取 registry 参数值,并将其设置为协议头
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        //获取注册中心实例
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // url转成Map
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        // group的分组也就是@DubboReference(group=)
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            // 合并两个group也就是走到了cluster路由层让这一层选择分发哪个group
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //创建实例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        //设置注册中心实例
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        // 生成服务消费者链接
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 注册服务消费者,在 consumers 目录下新节点
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
            registry.register(registeredConsumerUrl);
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        //当前消费者发送订阅的消息给注册中心,注册中心会把关注的生产者动态notify到消费者
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // 一个注册中心可能有多个服务提供者,因此这里需要将多个服务提供者合并为一个
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
}

Продюсер останавливается

  • Вы, должно быть, видели много разрушений прокси, когда производитель отключается, но они не являются фактическим классом реализации.
/**
     * Get proxy.
     *
     * @param cl  class loader.
     * @param ics interface class array.
     * @return Proxy instance.
     */
    public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
        if (ics.length > 65535)
            throw new IllegalArgumentException("interface limit exceeded");

        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < ics.length; i++) {
            String itf = ics[i].getName();
            if (!ics[i].isInterface())
                throw new RuntimeException(itf + " is not a interface.");

            Class<?> tmp = null;
            try {
                tmp = Class.forName(itf, false, cl);
            } catch (ClassNotFoundException e) {
            }

            if (tmp != ics[i])
                throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

            sb.append(itf).append(';');
        }

        // use interface class name list as key.
        String key = sb.toString();

        // get cache by class loader.
        // 这里获取类加载的Map,如果没有就新建一个类加载器
        // ProxyCacheMap使用的是WeakHashMap,这个类里面的Entry是弱引用,内存不够不至于dubbo撑爆内存
        Map<String, Object> cache;
        synchronized (ProxyCacheMap) {
            cache = ProxyCacheMap.get(cl);
            if (cache == null) {
                cache = new HashMap<String, Object>();
                ProxyCacheMap.put(cl, cache);
            }
        }

        Proxy proxy = null;
        synchronized (cache) {
            do {
                ...从缓存拿,没有就算了
            }
        }

        long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        ClassGenerator ccp = null, ccm = null;
        try {
            ccp = ClassGenerator.newInstance(cl);

            Set<String> worked = new HashSet<String>();
            List<Method> methods = new ArrayList<Method>();

            for (int i = 0; i < ics.length; i++) {
                if (!Modifier.isPublic(ics[i].getModifiers())) {
                    String npkg = ics[i].getPackage().getName();
                    if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if (!pkg.equals(npkg))
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
                ccp.addInterface(ics[i]);

                for (Method method : ics[i].getMethods()) {
                    String desc = ReflectUtils.getDesc(method);
                    if (worked.contains(desc))
                        continue;
                    worked.add(desc);

                    int ix = methods.size();
                    Class<?> rt = method.getReturnType();
                    Class<?>[] pts = method.getParameterTypes();

                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++)
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                    if (!Void.TYPE.equals(rt))
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");

                    methods.add(method);
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }

            if (pkg == null)
                pkg = PACKAGE_NAME;

            // create ProxyInstance class.
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class.
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
            proxy = (Proxy) pc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            // release ClassGenerator
            if (ccp != null)
                ccp.release();
            if (ccm != null)
                ccm.release();
            synchronized (cache) {
                if (proxy == null)
                    cache.remove(key);
                else
                    cache.put(key, new WeakReference<Proxy>(proxy));
                cache.notifyAll();
            }
        }
        return proxy;
    }
  • Вышеупомянутый процесс взят из кеша, и если вы не можете его получить, вы можете сгенерировать его самостоятельно.Если вы можете разобраться в ccp и ccm, то легко запутаетесь.ccp - это производство прокси-классов для классов реализации , а ccm — это прокси, который генерирует классы прокси.

возникшие проблемы

Я никогда не понимал, почему каждый раз, когда производитель перезагружается, он сообщает, что нет доступного производителя, о чем вам говорят следующие две картинки.

  • На следующем рисунке показано, когда потребитель запускается, и реестр сообщает потребителю, что производитель может вернуться и обновить Invoker. Конкретное обновление — это только что упомянутый класс ExchangeClient. Вы можете увидеть свойства этого класса в Invoker. Когда потребитель вызывает производителя, он возвращается, чтобы определить, есть ли у клиента производитель, и если нет, то будет сообщено об исключении rpc. Когда потребитель уведомлен, он возвращается к созданию клиента, но когда реестр уведомит потребителя, этот процесс зависит от загрузки реестра, он будет уведомлен в ближайшее время, когда он простаивает или количество подключений мало.

  • На следующем рисунке показано, что центр регистрации немедленно уведомляет потребителей, когда производитель деактивирован. Это мгновение, когда я тестирую здесь локально, но автор не уверен. Я могу пойти посмотреть, как Nacos обрабатывает производителя в автономном режиме. Когда уведомление переходит в автономный режим, клиент в вызывающем также будет очищен.В это время вызов, естественно, rpc ненормальный. Таким образом, во время непрерывного выпуска вы можете подождать 1 минуту, прежде чем деактивировать другой узел.

резюме

С момента использования Dubbo до сих пор я видел его исходный код и нашел много деталей, которых я не понимал или не понимал раньше.Я медленно закончил процесс и почувствовал, что его дизайн был довольно мощным, но уродливым и простым конвертировать все звонки в форму url.Выставлять информацию, но dubbo обычно является внутренним звонком, и не имеет значения, если его не увидит внешняя сеть. Если вам нужна более подробная информация о звонках, вы можете изучить ее позже.