1. Принцип реализации парсинга конфигурации на основе аннотаций
Инфраструктура Dubbo поддерживает несколько методов анализа конфигурации, таких как Schema/xml/annotation для анализа конфигурации Анализ конфигурации на основе аннотаций не только прост и удобен в использовании, но также более гибок и требует меньше кода. Фреймворк Spring резервирует множество интерфейсов для импорта и анализа ресурсов конфигурации, сканирования bean-компонентов и внедрения генерации и т. д. Большая часть процесса анализа аннотаций Dubbo здесь выполняется в invokeBeanFactoryPostProcessor. Платформа Dubbo в основном полагается на основные компоненты, такие как аннотация @EnableDubbo/ServiceAnotationBeanPostProcessor/ReferenceAnnotationBeanPostProcessor и сборщик конфигурации DubboConfigConfigurationSelector для завершения синтаксического анализа.Если пользователь использует файл конфигурации, платформа будет генерировать соответствующие bean-компоненты по запросу, например, при со стороны поставщика услуг.Необходимо импортировать класс, аннотированный @Service Dubbo, как bean-компонент, и внедрить прокси-объект в поле или метод, аннотированный @Reference, на стороне потребителя службы.
При запуске контейнера Spring, если @import используется в аннотации, будет запущен метод selectImports аннотации, такой как DubboConfigConfigurationRegister, указанный в аннотации EnableDubboConfig, и метод registerBeanDefinations будет вызван автоматически.
@EnableDubboConfig
@DubboComponentScan
public @interface EnableDubbo {
....
}
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {
...
}
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan {
...
}
В классе обработки регистрации конфигурации DubboConfigConfigurationRegister bean-компонент DubboConfiguration регистрируется в контейнере Spring, а также регистрируются другие препроцессоры, такие как обработка аннотаций @Reference и прослушиватели событий запуска платформы.
public class DubboConfigConfigurationRegistrar implements
ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
//获取注解上的配置信息
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
importingClassMetadata.getAnnotationAttributes(
EnableDubboConfig.class.getName()));
boolean multiple = attributes.getBoolean("multiple");
registerBeans(registry, DubboConfigConfiguration.Single.class);
if (multiple) {
registerBeans(registry, DubboConfigConfiguration.Multiple.class);
}
//向Spring容器注入通用的bean
registerCommonBeans(registry);
}
}
static void registerCommonBeans(BeanDefinitionRegistry registry) {
//注入Reference注解前置处理器bean
registerInfrastructureBean(registry,
ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
ReferenceAnnotationBeanPostProcessor.class);
.......
//注入Dubbo应用框架启动监听器bean,这里面进行Dubbo应用启动
registerInfrastructureBean(registry,
DubboBootstrapApplicationListener.BEAN_NAME,
DubboBootstrapApplicationListener.class);
.......
}
Если пользователь настраивает свойство, такое как dubbo.application.name, Spring Bean будет автоматически создан в контейнере. Привязка свойств Bean объекта регистрации и конфигурации выполняется в методе registerConfigurationBeanDefination.
public class ApplicationConfig extends AbstractConfig {
....
private String name;
private String version;
....
}
public class DubboConfigConfiguration {
@EnableConfigurationBeanBindings({
@EnableConfigurationBeanBinding(prefix = "dubbo.application",
type = ApplicationConfig.class),
.....})
public static class Single {
}
@EnableConfigurationBeanBindings({
@EnableConfigurationBeanBinding(prefix = "dubbo.applications",
type = ApplicationConfig.class, multiple = true),
.....})
public static class Multiple {
}
}
@Import(ConfigurationBeanBindingsRegister.class)
public @interface EnableConfigurationBeanBindings {
EnableConfigurationBeanBinding[] value();
}
public class ConfigurationBeanBindingsRegister
implements ImportBeanDefinitionRegistrar, EnvironmentAware {
....
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
//获取注解相关属性
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
importingClassMetadata.getAnnotationAttributes(
EnableConfigurationBeanBindings.class.getName()));
//获取注解相关值
AnnotationAttributes[] annotationAttributes =
attributes.getAnnotationArray("value");
ConfigurationBeanBindingRegistrar registrar =
new ConfigurationBeanBindingRegistrar();
registrar.setEnvironment(environment);
//完成属性值与配置bean的绑定
for (AnnotationAttributes element : annotationAttributes) {
registrar.registerConfigurationBeanDefinitions(element, registry);
}
}
...
}
Когда пользователь использует @DubboComponentScan, DubboComponentScanRegister активируется, и одновременно создаются два процессора, ServiceClassPostProcessor и ReferenceAnnotationBeanPostProcessor, для обработки аннотаций производства и потребления.
public class DubboComponentScanRegistrar implements ImportBeanDefinitionRegistrar {
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata,
BeanDefinitionRegistry registry) {
//扫描包路径
Set<String> packagesToScan = getPackagesToScan(importingClassMetadata);
//生成生成注解处理器
registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
//生成消费注解处理器
registerCommonBeans(registry);
}
}
Среди них процессор ServiceClassPostProcessor реализует интерфейс BeanDefinitionRegistryPostProcessor.После регистрации всех bean-компонентов в контейнере Spring вызывается метод postProcessBeanDefinitionRegistry для сканирования аннотации @DubboService и внедрения ее в контейнер.
public class ServiceClassPostProcessor
implements BeanDefinitionRegistryPostProcessor, EnvironmentAware,
ResourceLoaderAware, BeanClassLoaderAware {
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
//注入容器启动事件监听器,当收到fresh事件后,将启动Dubbo应用框架
registerBeans(registry, DubboBootstrapApplicationListener.class);
//获取用户注解配置的包扫描路径
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
//触发ServiceBean定义和注入
registerServiceBeans(resolvedPackagesToScan, registry);
}
}
private final static List<Class<? extends Annotation>> serviceAnnotationTypes = asList(
DubboService.class,
Service.class,
com.alibaba.dubbo.config.annotation.Service.class
);
private void registerServiceBeans(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
DubboClassPathBeanDefinitionScanner scanner =
new DubboClassPathBeanDefinitionScanner(registry, environment, resourceLoader);
BeanNameGenerator beanNameGenerator = resolveBeanNameGenerator(registry);
scanner.setBeanNameGenerator(beanNameGenerator);
//指定扫描的三种注解方式,而不会扫描其他类型的注解
serviceAnnotationTypes.forEach(annotationType -> {
scanner.addIncludeFilter(new AnnotationTypeFilter(annotationType));
});
for (String packageToScan : packagesToScan) {
//将@DubboService等作为不同Bean注入容器中
scanner.scan(packageToScan);
Set<BeanDefinitionHolder> beanDefinitionHolders =
findServiceBeanDefinitionHolders(scanner, packageToScan,
registry, beanNameGenerator);
if (!CollectionUtils.isEmpty(beanDefinitionHolders)) {
for (BeanDefinitionHolder beanDefinitionHolder :
beanDefinitionHolders) {
//注册ServiceBean定义并做数据绑定和解析
registerServiceBean(beanDefinitionHolder, registry, scanner);
}
}
}
}
}
При фактическом использовании аннотация @Reference будет внедрена в класс использования, который может легко инициировать удаленные вызовы.Внедрение атрибута Dubbo обрабатывается через ReferenceAnnotationProcessor, в основном путем получения полей или методов аннотации @DubboRefence в классе, и через отражение После установки ссылки на поле или метод аннотированный объект будет преобразован в объект ReferenceBean.
public class ReferenceAnnotationBeanPostProcessor
extends AbstractAnnotationBeanPostProcessor implements
ApplicationContextAware, ApplicationListener<ServiceBeanExportedEvent> {
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes,
Object bean, String beanName,
Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
....
//注册ReferenceBean
registerReferenceBean(referencedBeanName, referenceBean, attributes,
localServiceBean, injectedType);
//缓存注入ReferenceBean
cacheInjectedReferenceBean(referenceBean, injectedElement);
//创建代理,最后返回的是Reference#getObject()
return getOrCreateProxy(referencedBeanName, referenceBean,
localServiceBean, injectedType);
}
@Override
public PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
//查找出bean所有标注了@Reference标注的字段和方法
InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
try {
//对字段或者方法进行反射绑定
metadata.inject(bean, beanName, pvs);
} catch (BeanCreationException ex) {
throw ex;
} catch (Throwable ex) {
.....
}
return pvs;
}
}
2. Принцип реализации сервисного воздействия
2.1 Механизм раскрытия услуги
Слушатель запуска Dubbo внедряется в метод DubboUtils#registerCommonBeans, а методы DubboBootstrap#start и DubboBootstrap#stop вызываются при запуске и уничтожении приложения Spring.
public class DubboBootstrapApplicationListener extends
OneTimeExecutionApplicationContextEventListener
implements Ordered {
....
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
//dubbo框架应用启动
dubboBootstrap.start();
}
private void onContextClosedEvent(ContextClosedEvent event) {
//dubbo框架应用停止
dubboBootstrap.stop();
}
....
}
public DubboBootstrap start() {
....
//初始化配置和监听器,拉取注册中心的配置信息,初始化服务元数据
initialize();
//暴露服务
exportServices();
if (!isOnlyRegisterProvider() || hasExportedServices()) {
//暴露服务元数据
exportMetadataService();
//注册服务提供者实例到注册中心
registerServiceInstance();
}
referServices();
....
}
Инфраструктура Dubbo делит доступ к удаленной службе на две части: первая часть преобразует удерживаемый экземпляр службы в Invoker через прокси-сервер, а вторая часть преобразует Invoker в Exporter через определенный протокол связи, такой как Dubbo/Thrift, где Invoker представляет собой сравнение всего фреймворка. Важным компонентом является функция «соединения предыдущего и следующего», это может быть локальная реализация, удаленная реализация или кластерная реализация.
Экземпляр протокола будет автоматически адаптирован в соответствии с URL-адресом службы, и конкретный протокол будет удален, например, zookeeper.Сначала будет создан экземпляр реестра, затем будет удален конкретный URL-адрес службы, соответствующий экспорту, и, наконец, будет использоваться протокол, соответствующий URL-адресу службы (протокол по умолчанию — Dubbo) Выполнение раскрытия службы.Когда раскрытие службы будет успешным, метаданные службы будут зарегистрированы в zookeeper, а информация об URL-адресе будет следующей.
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=dubbo-demo-annotation-provider&dubbo=2.0.2&pid=51310
®istry=zookeeper&release=2.7.7×tamp=1613395854976
После того как ссылка на экземпляр службы преобразована в Invoker, осуществляется более детальное управление с помощью RegistryProtocol#export, например, сначала предоставляется доступ к службе, а затем регистрируются метаданные службы. Реестр последовательно выполняет следующие действия, когда служба предоставляется:
-
Делегируйте определенный протокол (Dubbo) для предоставления услуги, создайте порт прослушивания NettyServer и сохраните экземпляры службы;
-
Создать объект сервисного центра и установить TCP-соединение с реестром;
-
Зарегистрируйте метаданные службы в реестре;
-
Подпишитесь на узел конфигураторов и прослушивайте события изменения динамических атрибутов службы;
-
Завершающие работы по уничтожению сервисов, такие как закрытие портов, антирегистрация служебной информации и т.д.;
public class ServiceConfig extends ServiceConfigBase {
public synchronized void export() {
if (bootstrap == null) { bootstrap = DubboBootstrap.getInstance(); bootstrap.init(); } //初始化服务元数据 serviceMetadata.setVersion(version); serviceMetadata.setGroup(group); serviceMetadata.setDefaultGroup(group); serviceMetadata.setServiceType(getInterfaceClass()); serviceMetadata.setServiceInterfaceName(getInterface()); serviceMetadata.setTarget(getRef()); doExport();
}
частная пустота doExportUrls () { ..... //Получить экземпляр реестра, соответствующий текущему сервису Список URL-адресов реестра = ConfigValidationUtils.loadRegistries(this, true);
//如果服务指定暴露多个协议(Dubbo、REST),则依次暴露服务 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { ... Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); //指定协议暴露服务,如DubboProtocol Exporter<?> exporter = PROTOCOL.export(wrapperInvoker); exporters.add(exporter); ...
}
}
public class JavassistProxyFactory extends AbstractProxyFactory { @Override public Invoker getInvoker(T proxy, Class type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName() .indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
Получите объект конфигурации с помощью отражения и поместите его в карту для последующего построения параметров URL (таких как имя приложения и т. д.), главным образом, чтобы отличить глобальную конфигурацию, по умолчанию добавьте префикс по умолчанию перед атрибутом. framework получает параметр URl, он создаст объект Invoker через динамический прокси.На стороне сервера генерируется экземпляр AbstractProxyInvoker.Все вызовы реальных методов будут делегированы прокси, а затем прокси перенаправит вызов службе ссылка В настоящее время существует два метода прокси: JavassistProxyFactory и JdkProxyFactory. Принцип режима JavassistProxyFactory заключается в создании подкласса Wrapper, реализации метода invokeMethod в подклассе и проверке имени метода и параметров метода для каждого метода ref в теле метода. Если они совпадают, вызовите их напрямую. По сравнению с JdkProxyFactory, вызов отражения опущен.с расходами.
После создания перехватчика вызова будет вызван протокол Dubbo для предоставления услуги.Ниже приведен код DubboProtocol#export. Среди них группа служб, версия, интерфейс службы и открытый порт будут использоваться в качестве ключа для связи с конкретной службой Iovoker.Только интерфейс, открытый в первый раз, должен открыть прослушивание порта и активировать метод привязки в Exchange. , и, наконец, вызвать NettyServer для обработки.В процессе инициализации сервера многие обработчики инициализируются для поддержки пульса, обработчик для пула бизнес-потоков для обработки кодирования и декодирования и обработчик для вызова метода ответа.
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//根据服务分组、版本、接口和端口构造的key
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//把exporter存储到单例DubboProtocol中
exporterMap.put(key, exporter);
.....
//服务初次暴露会创建监听服务器
openServer(url);
optimizeSerialization(url);
.....
return exporter;
}
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
//服务器实例会缓存
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
server.reset(url);
}
}z
}
private ProtocolServer createServer(URL url) {
....
ExchangeServer server;
try {
//创建NettyServer并初始化Handler
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
....
}
...
return new DubboProtocolServer(server);
}
Когда служба действительно вызывается, будут запущены различные фильтры перехватчика. Прежде чем служба будет раскрыта, платформа инициализирует перехватчик. Dubbo автоматически внедрит ProtocolListenerWrapper при загрузке точки расширения протокола. Фактическое воздействие основано на ProtocolListenerWrapper --> ProtocolFilterWrapper --> DubboProtocol В реализации ProtocolListenerWrapper соответствующий метод прослушивателя вызывается обратно при раскрытии поставщика услуг.
public class ProtocolListenerWrapper implements Protocol {
...
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
...
}
public class ProtocolFilterWrapper implements Protocol {
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//加载所有的拦截器类实例
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
//将真实的Invoker放在拦截器的末尾
last = new Invoker<T>() {
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
//每次调用都会传递到下一个拦截器
asyncResult = filter.invoke(next, invocation);
return asyncResult.whenCompleteWithContext((r, t) -> {...});
.....
});
}
....
}
}
return last;
}
}
2.2 Регистрация службы
Общий процесс показан на рисунке ниже, а более подробная инструкция есть на официальном сайте.
- Когда поставщик услуг запустится, он запишет в реестр свою собственную информацию о метаданных и одновременно подпишется на настройку информации о метаданных;
- Когда потребитель запускается, он также записывает в реестр свои собственные метаданные и подписывается на метаданные поставщика услуг, маршрутизации и конфигурации;
- При запуске центра управления услугами он одновременно подпишется на всех потребителей, поставщиков услуг, метаданные маршрутизации и конфигурации;
- Когда поставщик услуг уходит или присоединяется новый поставщик услуг, каталог предоставления услуг в реестре изменяется, и информация об изменении будет динамически уведомляться потребителям и центру управления услугами;
- Когда потребитель инициирует вызов службы, он асинхронно сообщает о вызове и статистической информации в центр мониторинга.
Обычно мы используем zookeeper в качестве сервисного реестра, zookeeperd — это реестр древовидной структуры, и каждый узел можно разделить на постоянные узлы, постоянные последовательные узлы, временные узлы и временные последовательные узлы.
- Корневым узлом дерева является группа реестра, ниже несколько сервисных интерфейсов, группа происходит из атрибута группы, по умолчанию /dubbo;
- Под сервисным интерфейсом есть 4 типа подкаталогов, а именно провайдеры, потребители, маршрутизаторы, конфигураторы, это путь к постоянному узлу;
- Интерфейсы, содержащиеся в каталоге поставщика услуг (/dubbo/service/providers), содержат метаданные нескольких URL-адресов службы;
- Интерфейс, находящийся в каталоге потребителей услуг (/dubbo/service/consumers), содержит несколько метаданных URL-адресов потребителей;
- Каталог конфигурации маршрутизации (/dubbo/service/routers) содержит несколько метаданных URL-адресов для потребительских политик маршрутизации;
- Каталог динамической конфигурации (/dubbo/service/configurators) содержит метаданные URL динамической конфигурации нескольких служб.
2.2.1 Общий принцип реализации реестра
Подписка на услуги и публикация — одна из основных функций всего реестра.Когда существующий узел поставщика услуг отключается или новый узел поставщика услуг присоединяется к среде микросервиса, потребители, подписавшиеся на соответствующий интерфейс, могут своевременно получать уведомления. реестр и обновления локальной информации. Платформа Dubbo имеет специальный функциональный уровень реестра для реализации подписки и публикации услуг. В настоящее время Dubbo поддерживает различное промежуточное программное обеспечение, такое как Zookeeper/Redis/euraka/nacos, в качестве центра регистрации.В настоящее время мы обычно используем центр регистрации Zookeeper. Модуль реестра Dubbo использует шаблоны проектирования, такие как шаблон шаблона и шаблон фабрики, чтобы обеспечить расширяемость.
AbstractRegistryFactory реализует метод getRegistry(URL-адрес) интерфейса RegistryFactory, который в основном завершает блокировку и вызывает шаблонный метод createRegistry(URL-адрес) для создания конкретной реализации и кэширует ее в памяти. В реестре есть несколько определенных фабричных классов, таких как zookeeperRegistryFactory/RedisRegistryFactory и т. д. По умолчанию фреймворк реализует фабричный класс zookeeperRegistryFactory.
public abstract class AbstractRegistryFactory implements RegistryFactory {
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
LOCK.lock();
try {
//缓存中有则直接返回
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//如果注册中心还没创建过,则调用抽象方法createRegistry(url)重新创建一个
//createRegistry方法由具体的子类如ZookeeperRegistryFactory实现
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock();
}
}
}
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
//创建Zookeeper的注册中心实例
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
2.2.2 Принцип реализации публикации услуг
И поставщики услуг, и потребители должны зарегистрироваться в реестре. Код реализации, опубликованный zookeeper, очень прост: он просто вызывает zk-клиент для создания каталога в реестре.
//zkClient创建目录
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
//zkClient删除路径
zkClient.delete(toUrlPath(url);
AbstractRegistry реализует регистрацию, подписку, запрос, уведомление и другие методы в интерфейсе реестра, а также реализует такие методы, как постоянная регистрационная информация дисковых файлов. FailbackRegistry наследует AbstractRegistry, переписывает методы регистрации, подписки, запроса и уведомления родительского класса и добавляет механизм повторных попыток.
//FailbackRegistry抽象类中未实现的抽象模版方法
public abstract void doRegister(URL url);
public abstract void doUnregister(URL url);
public abstract void doSubscribe(URL url, NotifyListener listener);
public abstract void doUnsubscribe(URL url, NotifyListener listener);
Взяв за пример логику подписки, FailbackRegistry переписывает метод подписки и реализует только общую логику подписки и обработки исключений и т. д., но то, как это реализовать, передается для реализации унаследованному подклассу.Возьмем класс реализации ZookeeperRegistry как например, интерфейс API, связанный с клиентом zk, используемый в конкретной логике подписки, выполняет операции с узлами и изменяет подписки на события.
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
//子类实现
doSubscribe(url, listener);
} catch (Exception e) {
......
}
}
}
2.2.3 Принцип реализации подписки на услуги
Обычно существует два способа подписки: получение и отправка.Во-первых, клиент периодически опрашивает реестр, чтобы получить конфигурацию, а во-вторых, реестр активно передает данные клиенту. В настоящее время Dubbo использует метод запуска извлечения в первый раз, а затем снова извлекает данные после получения событий. Когда служба открыта, сервер подпишется на конфигураторов для мониторинга динамической конфигурации.При запуске потребителя потребитель подпишется на три каталога: провайдеры, маршрутизаторы и конфигураторы, соответствующие поставщикам служб приложений, маршрутам и уведомлениям об изменении динамической конфигурации. .
Реестр zookeeper использует метод «уведомление о событии» + «вытягивание клиента».Когда клиент подключается к реестру в первый раз, он получает полный объем данных в соответствующем каталоге и регистрирует прослушиватель часов на подписанном узле. ., клиент и центр регистрации поддерживают длительное TCP-соединение. При изменении каких-либо данных в каждом узле центр регистрации будет активно уведомлять клиента по обратному вызову наблюдателя. После получения уведомления он отправит полную сумму данные в соответствующем узле. Все они извлекаются (вытягивание на стороне клиента), что отражается в интерфейсе NotifyListener#notify(List urls).
Когда клиент подключается к центру регистрации в первый раз, он получает все данные при подписке, а затем обновляет их через событие прослушивателя.Центр управления услугами будет обрабатывать подписку на все подписанные слои услуг. текущий узел, он также подписывается на все дочерние узлы этого узла.
//ZookeeperRegistry中的订阅具体逻辑
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
//订阅所有的数据
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
//第一次则会新建一个listener监听器
ChildListener zkListener =
listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
//遍历子节点,有变化则会收到通知
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) { //如果还没被订阅,则订阅
anyServices.add(child);
subscribe(url.setPath(child)
.addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
//创建持久节点,并订阅子节点
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service)
.addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners =
zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener =
listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
List<String> children =
zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//回调NotifyListener,更新本地缓存信息
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
2.2.4 Механизм кэширования
Кэш — это обмен пространства на время.Если каждый удаленный вызов должен сначала получить список вызываемых служб из реестра, реестр будет испытывать огромную нагрузку на трафик. Каждый дополнительный сетевой запрос также снижает производительность всей сервисной сети, поэтому локальный диск и кеш памяти инкапсулируются в AbstractRegistry.Кэш памяти в основном хранится в объекте Properties, а на диске сохраняется файл, который на который ссылается объект File. Файловые файлы хранятся в каталоге /.dubbo домашнего каталога пользователя и называются в честь интерфейса службы - экземпляра службы ip:port.
Содержимое файла следующее
com.seewo.demo.service.interfaces.DemoService=empty\://192.168.0.109/
com.seewo.demo.service.interfaces.DemoService?
application\=dubbo-demo-annotation-consumer&category\=routers&check\=false
&dubbo\=2.0.2&init\=false
&interface\=com.seewo.demo.service.interfaces.DemoService
&methods\=sayHello,sayHelloAsync&pid\=88004
&release\=2.7.7&revision\=1.0-SNAPSHOT
&side\=consumer
&sticky\=false×tamp\=1613461973362
......
Когда служба инициализируется, конструктор AbstractRegistry считывает постоянные регистрационные данные из файла на локальном диске в объект Properties и загружает их в кэш памяти.
public abstract class AbstractRegistry implements Registry {
private final Properties properties = new Properties();
//内存中的监听器对象
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed
= new ConcurrentHashMap<>();
//内存中的服务缓存对象
private final ConcurrentMap<URL, Map<String, List<URL>>> notified
= new ConcurrentHashMap<>();
//磁盘文件服务缓存对象
private File file;
public AbstractRegistry(URL url) {
setUrl(url);
if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
//从url中指定是否为同步保存文件
syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
String defaultFilename = System.getProperty("user.home") +
"/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) +
"-" + url.getAddress().replaceAll(":", "-") + ".cache";
String filename = url.getParameter(FILE_KEY, defaultFilename);
File file = null;
if (ConfigUtils.isNotEmpty(filename)) {
file = new File(filename);
if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
if (!file.getParentFile().mkdirs()) {
throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
}
}
}
this.file = file;
loadProperties();
notify(url.getBackupUrls());
}
private void loadProperties() {
if (file != null && file.exists()) {
InputStream in = null;
try {
in = new FileInputStream(file);
properties.load(in); //读取磁盘中的文件
...
} catch (Throwable e) {
...
} finally {
...
}
}
}
}
Существует два способа сохранения кеша: синхронный и асинхронный. Асинхронный использует пул потоков для асинхронного сохранения. Если во время выполнения потока возникнет исключение, пул потоков будет вызван снова для повторной попытки.
private void saveProperties(URL url) {
...
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}
}
..
}
3. Принцип реализации потребления услуг
Инфраструктура Dubbo также разделена на две части для использования службы.Первый шаг заключается в создании Invoker через экземпляр удаленной службы, который он содержит.Этот Invoker является основным удаленным прокси-объектом на стороне клиента. Второй шаг преобразует Invoker через динамический прокси в ссылку динамического прокси, которая реализует пользовательский интерфейс. Здесь Invoker выполняет такие функции, как связь по сетевому соединению, вызов службы и повторная попытка.
Фактической точкой входа платформы Dubbo для ссылки на службу является ReferenceBean#getObject, который наследуется от класса ReferenceConfig. После обработки аннотатором ReferenceAnnotationBeanProcessor контейнер Spring автоматически внедрит объект ReferenceBean при использовании объекта, указанного аннотацией @Reference.
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean,
ApplicationContextAware, InitializingBean, DisposableBean {
...
@Override
public void afterPropertiesSet() throws Exception {
....
if (shouldInit()) {
//调用ReferenceConfig#get方法
getObject();
}
}
....
}
Объект ReferenceBean будет сгенерирован и инициализирован в ReferenceConfig. Dubbo поддерживает одновременное использование несколькими реестрами.Если служба настроена на одновременную регистрацию нескольких реестров, она будет объединена в один Invoker в ReferenceConfig#createProxy. В методе createProxy завершено создание удаленного прокси-объекта и преобразование прокси-объекта.Клиент начинает подтягивать метаданные службы, подписываться на провайдера, менять маршрутизацию и конфигурацию.При потреблении через реестр в основном инициируется RegistryProtocol#refer Операции, такие как извлечение данных, подписка и преобразование службы Invoker.
public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
....
public synchronized void init() {
//创建代理对象
ref = createProxy(map);
....
}
private T createProxy(Map<String, String> map) {
//单注册中心消费
if (urls.size() == 1) {
//进行接口与url关联并生成invoker
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
//逐个获取注册中心的服务,并添加到Invoker列表中
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url;
}
}
if (registryURL != null) {
//通过Cluster将多个Invoker转换成一个Invoker
URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else {
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
...
// 把Invoke转换成接口代理
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
}
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().
getContextClassLoader(), interfaces,
new InvokerInvocationHandler(invoker));
}
}
public class InvokerInvocationHandler implements InvocationHandler {
.....
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
.....
return invoker.invoke(rpcInvocation).recreate();
}
}
В методе createProxy метод RegistryProtocol#refer вызывается для запуска таких операций, как извлечение данных, подписка и преобразование службы Invoker.Основной структурой данных является RegistryDirector, которая содержит соответствие между Invoker и Url.
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//获取具体注册中心协议,比如zookeeper
url = getRegistryUrl(url);
//创建具体注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
...
//处理订阅数据并通过Cluster合并多个Invoker
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry,
Class<T> type, URL url) {
//持有Invoker和接收订阅通知
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
//注册消费者信息到消费中心
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//订阅服务提供者、路由和动态配置
directory.subscribe(toSubscribeUrl(subscribeUrl));
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
for (RegistryProtocolListener listener : listeners) {
//触发监听器
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
Приведенная выше логика завершает создание экземпляра реестра, функции регистрации метаданных в реестре и подписки, а также выполнение преобразования протокола (например, протокола zookeeper) в соответствии с реестром, указанным пользователем. Конкретный протокол реестра будет использовать реестр для сохранения соответствующего протокола при запуске. После создания экземпляра центра URL-адрес здесь фактически является адресом центра регистрации, а реальный потребитель хранится в атрибуте refer. Метод nofity запускается в методе doSubscribe.
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
...
//合并provider端配置数据,比如服务端IP和port等
URL url = mergeUrl(providerUrl);
String key = url.toFullString();
if (keys.contains(key)) { //如果已推送过的服务列表则忽略
continue;
}
keys.add(key);
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
//使用具体协议创建远程连接
invoker = new InvokerDelegate<>(
protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
Конкретное создание Invoker реализовано в DubboProtocol#refer.Когда первый потребитель инициирует подписку, будет запущено время первой операции извлечения данных, и будет запущен метод RegistryDirectory#nofity.Данные уведомления здесь представляют собой полную сумму определенного Данные, такие как данные категорий провайдеров и маршрутизаторов. Когда поставщики уведомляются о данных, преобразование Invoker выполняется в методе RegistryDirectory#toInvoker.
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url,
getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
Протокол Dubbo инициализирует объект подключения клиента перед возвратом объекта DubboInvoker.Поддерживает ли Dubbo клиент для немедленного установления TCP-соединения с удаленной службой, определяется тем, настроен ли параметр с атрибутом lazy, и все подключения выполняются по умолчанию. DubboProtocol#refer будет внутренне вызывать DubboProtocol#initClient, отвечающий за установку клиента и инициализацию обработчика.
private ExchangeClient initClient(URL url) {
.....
ExchangeClient client;
try {
//如果设置了lazzy属性,则真实调用的时候才会创建Tcp连接
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}
4. Резюме
В этой главе сначала обсуждается основной процесс синтаксического анализа фреймворка Dubbo и фреймворка Spring, объединяющий аннотации, а также объясняется производство и обслуживание сервисов Dubbo, обнаружение/подписка/локальное кэширование регистрации сервисов и другие механизмы.