Серия исходных кодов SpringCloud (1) — инициализация запуска центра регистрации Eureka

Spring Cloud
Серия исходных кодов SpringCloud (1) — инициализация запуска центра регистрации Eureka

EurekaдаNetflixКомпонент регистрации и обнаружения службы с открытым исходным кодом компании, наряду с другими компонентами службы Netflix (такими как балансировка нагрузки, автоматические выключатели, шлюзы и т. д.), интегрированы Spring Cloud какSpring Cloud Netflixмодуль. ноEureka 2.0Это закрытый исходный код, но 1.x все еще находится на обслуживании, и его можно продолжать использовать. В этой статье мы узнаем больше о реестре Eureka, чтобы лучше использовать и настраивать реестр.

О версии. Облачная версия Spring, используемая в этой статье, — Hoxton.SR8, загрузочная версия Spring — 2.3.3.RELEASE, а зависимая версия eureka — 1.9.25.

Эврика первый опыт

Эврика делится наEureka Serverа такжеEureka Client, Eureka Server — это реестр Eureka, а Eureka Client — это клиент Eureka. В этом разделе мы сначала создадим полку реестра через демонстрацию и взглянем на инфраструктуру реестра.

Eureka Server

1. Создайте службу реестра:sunny-register

Сначала создайте проект maven, имя службы — solar-register, и введите зависимость сервера реестра в pom.xml.

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
</dependencies>

2. Добавьте файл конфигурации

Добавить в раздел "Ресурсы"application.ymlфайл конфигурации и добавьте конфигурацию, связанную с реестром.

server:
  port: 8000
spring:
  application:
    name: sunny-register

eureka:
  instance:
    hostname: dev.lyyzoo.com
  client:
    # 是否向注册中心注册自己
    register-with-eureka: false
    # 是否检索服务
    fetch-registry: false
    service-url:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

3. Добавьте класс запуска

Добавьте класс запуска и добавьте его в класс запуска@EnableEurekaServerАннотация для включения реестра.

package com.lyyzoo.sunny.register;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@EnableEurekaServer
@SpringBootApplication
public class RegisterApplication {

    public static void main(String[] args) {
        SpringApplication.run(RegisterApplication.class, args);
    }
}

4. Запустите центр регистрации

После запуска реестра посетитеhttp://dev.lyyzoo.com:8000/, вы можете увидеть страницу центра регистрации, и ни один экземпляр еще не зарегистрирован. (dev.lyyzoo.com отображается на 127.0.0.1 в файле локальных хостов)

Eureka Client

Создайте два демонстрационных сервиса,demo-producerСервис предоставляет интерфейс как производитель,demo-consumerСлужба действует как потребитель для вызова интерфейса демо-производителя.

1. Создайте клиентскую службу:demo-producer

Создайте проект maven, имя службы для демо-производителя, зависимое от внесения реестра клиентов в pom.xml и добавьте веб-зависимость.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

2. Добавьте файл конфигурации

добавить в ресурсыapplication.ymlФайл конфигурации, добавьте конфигурацию, связанную с клиентом реестра.

server:
  port: 8010
spring:
  application:
    name: demo-producer

eureka:
  client:
    serviceUrl:
      defaultZone: ${EUREKA_DEFAULT_ZONE:http://dev.lyyzoo.com:8000/eureka}

3. Добавьте класс запуска

Добавьте класс запуска и добавьте его в класс запуска@EnableEurekaClientАннотация для включения клиента.

@EnableEurekaClient
@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }
}

4. Добавьте интерфейс отдыха

Добавьте интерфейс для тестовых вызовов:

@RestController
public class DemoController {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @GetMapping("/v1/uuid")
    public ResponseEntity<String> getUUID() {
        String uuid = UUID.randomUUID().toString();
        logger.info("generate uuid: {}", uuid);
        return ResponseEntity.ok(uuid);
    }
}

5. Создайте обслуживание клиентов:demo-consumer

Аналогичным образом создайте сервис-потребитель: demo-producer, добавьте к этому сервису интерфейс-потребитель, черезRestTemplateМетод балансировки нагрузки для вызова интерфейса демо-производителя.

Поэтому вам нужно сначала настроить RestTemplate с балансировкой нагрузки:

@EnableEurekaClient
@SpringBootApplication
public class ConsumerApplication {

    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
}

Добавьте потребительский интерфейс. Обратите внимание, что URL-адрес — это написанное имя службы, а не конкретный IP-адрес или порт. В сценарии с микрослужбой невозможно указать конкретный адрес при вызове между службами.

@RestController
public class DemoController {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private RestTemplate restTemplate;

    @GetMapping("/v1/id")
    public ResponseEntity<String> getId() {
        ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class);
        String uuid = result.getBody();
        logger.info("request id: {}", uuid);
        return ResponseEntity.ok(uuid);
    }
}

6. Запустите клиент реестра

Запустите демо-производитель с двумя разными портами, и вы можете указать порты с помощью переменных среды. Затем снова запустите демо-потребитель.

После завершения запуска вы можете увидеть два экземпляра демо-производителя и один экземпляр демо-потребителя, зарегистрированные в реестре, со статусом UP.

7. Тестовый интерфейс

Вызовите интерфейс потребительской службы и получите к нему доступ несколько раз.http://dev.lyyzoo.com:8020/v1/idвы обнаружите, что консоли двух экземпляров демо-потребителя службы-производителя будут поочередно выводить информацию журнала. Это означает, что клиент-потребитель обращается к производителю через имя службы.

Эврика инфраструктура

Через предыдущий опыт вы можете обнаружить, что звонок услуг должен знать только имя сервиса, и не нужно указывать определенный IP-адрес и порт. Как это сделал?

Нетрудно заметить, что инфраструктура EUREKA содержит три роли:

  • 服务注册中心: Eureka Server, предоставляющий функции регистрации и обнаружения служб.
  • 服务提供者: Eureka Client, предоставляющий услуги (также может выступать в роли потребителя)
  • 服务消费者: Eureka Client, потребительский сервис (сам может быть и провайдером)

Во-первых, требуется центр регистрации службы, и клиент регистрируется в центре регистрации и отправляет свою собственную информацию (например, имя службы, IP-адрес службы, информацию о порте и т. д.) в центр регистрации. Клиент получает информацию о списке регистрации услуг из центра регистрации, и этот список включает в себя всю информацию об услугах, зарегистрированную в центре регистрации. После получения информации о списке регистрации службы клиентская служба может найти все экземпляры службы в соответствии с именем службы, а затем выбрать один из экземпляров с помощью балансировки нагрузки, а затем вызвать API-интерфейс службы в соответствии с его IP-адресом и информация о порте.

Это самая основная структура и функция реестра, которая обеспечивает регистрацию и обнаружение служб, а также предоставляет сведения о списке регистрации служб для каждого клиента. Но для выполнения этих задач у Eureka есть много механизмов для достижения и обеспечения высокой доступности, таких как регистрация службы, продление службы, получение списка регистрации службы, автономная служба, отбраковка службы и т. д. Eureka также предоставляет множество параметров, чтобы мы могли оптимизировать некоторые из ее функций и конфигураций в соответствии с реальными сценариями, такими как время поддержания пульса, интервал для извлечения реестра и механизм самозащиты. Давайте проанализируем эти функции и параметры эврики на исходном уровне эврики, поймем ее принципы и изучим некоторые ее конструкции.

Подготовка исходного кода Eureka

Хотя мыpom.xmlзависит отspring-cloud-starter-netflix-eureka-serverа такжеspring-cloud-starter-netflix-eureka-client,ноspring-cloud-starter-netflixОн просто инкапсулирует eureka, чтобы его можно было запустить и инициализировать с помощью springboot.Нижний слой на самом деле является netflix.eureka-core、eureka-clientЖдать. Итак, давайте сначала проанализируем исходный код netflix eureka и, наконец, посмотрим на исходный код spring-cloud-starter-netflix.

Подготовка исходной среды

1. Загрузите исходный код

Клонируйте исходный код eureka на локальный:

$ git clone https://github.com/Netflix/eureka.git

Поскольку мы полагаемся на1.9.25версия, после локального клонирования кода переключите ее на 1.9.25:

$ git checkout -b 1.9.25

Затем перейдите в корневой каталог eureka, чтобы выполнить команду сборки:

$ ./gradlew clean build -x test

2. IDEA открывает исходный код

Поскольку эврика используетgradleЧтобы управлять зависимостями, вам нужно установить Gradle локально, а затем вам нужно установить плагин Gradle в IDEA, который похож на maven.Учебник по установке может быть Baidu.

Инженерная структура Эврика

Эврика в основном включает в себя следующие модули:

  • eureka-client: эврика клиент
  • eureka-core: сервер eureka, основная функция реестра
  • eureka-resources: консоль Eureka на основе jsp, вы можете просмотреть, какие экземпляры службы зарегистрированы
  • eureka-server: центр реестра, который объединяет eureka-client, eureka-core и eureka-resources. Поскольку он опирается на eureka-client, eureka-server также является клиентом. В режиме кластера eureka server eureka-server также будет зарегистрирован как клиента в другие реестры
  • eureka-examples: пример эврики
  • eureka-test-utils: инструмент модульного тестирования eureka
  • eureka-core|client-jersey2: инкапсуляция фреймворка джерси, джерси похожа на spring mvc, поддерживает http restful-запросы, а связь между eureka-client и eureka-server основана на фреймворке jersey.

Инициализация запуска сервера Eureka

Первое, на что следует обратить внимание, этоeureka-server` после активации реестра клиент может зарегистрировать службу и обнаружить ее.

eureka-серверный модуль

1. директория eureka-сервера

  • Каталог ресурсов - это в основном файлы конфигурации клиента и сервера eureka.
  • Есть один под веб-приложениемweb.xmlФайл конфигурации, который настраивает запись для запуска инициализации, также можно увидеть из этого, eureka-server будет упакован в военный пакет для запуска.
  • Тестируемый класс модульного тестаEurekaClientServerRestIntegrationTest, который включает модульные тесты, такие как регистрация службы, обновление и автономный режим, мы можем запускать эти модульные тесты для отладки кода.

2. веб.xml

Содержимое web.xml:

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="2.5"
         xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
    http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">
  <!-- eureka 启动初始化类 -->
  <listener>
    <listener-class>com.netflix.eureka.EurekaBootStrap</listener-class>
  </listener>

  <!-- 状态过滤器 -->
  <filter>
    <filter-name>statusFilter</filter-name>
    <filter-class>com.netflix.eureka.StatusFilter</filter-class>
  </filter>

  <!-- 认证过滤器 -->
  <filter>
    <filter-name>requestAuthFilter</filter-name>
    <filter-class>com.netflix.eureka.ServerRequestAuthFilter</filter-class>
  </filter>

  <!-- 限流过滤器 -->
  <filter>
    <filter-name>rateLimitingFilter</filter-name>
    <filter-class>com.netflix.eureka.RateLimitingFilter</filter-class>
  </filter>
  <filter>
    <filter-name>gzipEncodingEnforcingFilter</filter-name>
    <filter-class>com.netflix.eureka.GzipEncodingEnforcingFilter</filter-class>
  </filter>

  <!-- jersey 容器 -->
  <filter>
    <filter-name>jersey</filter-name>
    <filter-class>com.sun.jersey.spi.container.servlet.ServletContainer</filter-class>
    <init-param>
      <param-name>com.sun.jersey.config.property.WebPageContentRegex</param-name>
      <param-value>/(flex|images|js|css|jsp)/.*</param-value>
    </init-param>
    <init-param>
      <param-name>com.sun.jersey.config.property.packages</param-name>
      <param-value>com.sun.jersey;com.netflix</param-value>
    </init-param>

  <filter-mapping>
    <filter-name>statusFilter</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>

  <filter-mapping>
    <filter-name>requestAuthFilter</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>

  <filter-mapping>
    <filter-name>jersey</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>

  <!-- 欢迎页 -->
  <welcome-file-list>
    <welcome-file>jsp/status.jsp</welcome-file>
  </welcome-file-list>

</web-app>

В Web.xml можно найти следующую информацию:

  • Когда сервер eureka запускается, он сначала проходитcom.netflix.eureka.EurekaBootStrapкласс для выполнения работы, связанной с инициализацией запуска.
  • настроенStatusFilter(фильтр состояния сервера),ServerRequestAuthFilter(фильтр аутентификации),RateLimitingFilter(Limiting Filter) и другие фильтры, но RateLimitingFilter по умолчанию не включен.
  • Контейнер сервлета, сконфигурированный с помощью джерси, на самом деле такой же, как у Springframework.DispatcherServletАналогично, используется для перехвата и обработки http restful-запросов, нам не нужно уделять этому слишком много внимания.
  • Наконец, страница приветствия сервера eureka также настроена какjsp/status.jspстраница, эта страница находится в модуле eureka-resources, который представляет собой страницу консоли eureka, показанную ранее.

3. Класс модульного тестированияEurekaClientServerRestIntegrationTest

Первый взглядsetUpметод, каждый тестовый пример будет запускать метод setUp для инициализации среды выполнения перед запуском.

@BeforeClass
public static void setUp() throws Exception {
    // 初始化 eureka 配置
    injectEurekaConfiguration();
    // 启动 eureka server,会找 build/libs 目录下的 eureka-server.*.war 包来运行
    // 这一步启动时,就会加载 web.xm 配置文件,然后进入 EurekaBootStrap 初始化类
    startServer();
    // eureka server 配置
    createEurekaServerConfig();

    // 创建 jersey 客户端,使用 jersey 客户端来调用资源
    httpClientFactory = JerseyEurekaHttpClientFactory.newBuilder()
            .withClientName("testEurekaClient")
            .withConnectionTimeout(1000)
            .withReadTimeout(1000)
            .withMaxConnectionsPerHost(1)
            .withMaxTotalConnections(1)
            .withConnectionIdleTimeout(1000)
            .build();

    jerseyEurekaClient = httpClientFactory.newClient(new DefaultEndpoint(eurekaServiceUrl));

    ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
    jerseyReplicationClient = JerseyReplicationClient.createReplicationClient(
            eurekaServerConfig,
            serverCodecs,
            eurekaServiceUrl
    );
}

Этот класс предоставляет несколько тестовых случаев, мы можем запускать эти тестовые примеры для отладки.

Инициализация EurekaBootStrap

EurekaBootStrapявляется входом слушателя, который реализуетServletContextListenerИнтерфейс в основном завершает запуск и инициализацию сервера eureka.

отcontextInitializedМетод входит, в целом, он делится на инициализацию среды eureka и инициализацию контекста сервера eureka.

@Override
public void contextInitialized(ServletContextEvent event) {
    try {
        // eureka 环境初始化
        initEurekaEnvironment();
        // eureka server 上下文初始化
        initEurekaServerContext();

        ServletContext sc = event.getServletContext();
        sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
    } catch (Throwable e) {
        logger.error("Cannot bootstrap eureka server :", e);
        throw new RuntimeException("Cannot bootstrap eureka server :", e);
    }
}

1. Инициализация среды Эврика

initEurekaEnvironmentСпособ в основном для установки центра обработки данных и параметров операционной среды:

  • archaius.deployment.datacenter = default
  • archaius.deployment.environment = test

2, Инициализация контекста сервера EUREKA

initEurekaServerContextИнициализация контекста состоит из многих этапов:

  • Создайте конфигурацию реестра eureka: EurekaServerConfig
  • Создайте конфигурацию экземпляра eureka: EurekaInstanceConfig
  • Информация об экземпляре конструкции: InstanceInfo
  • Создайте диспетчер экземпляров: ApplicationInfoManager
  • Создайте конфигурацию клиента eureka: EurekaClientConfig
  • Создайте клиент eureka: EurekaClient (DiscoveryClient)
  • Создайте реестр (реестр, который знает о кластере eureka): PeerAwareInstanceRegistry
  • Создайте кластер: PeerEurekaNodes
  • Инкапсулировать информацию в контекст eureka: EurekaServerContext
  • Поместите контекст eureka в глобальный контейнер: EurekaServerContextHolder.
  • Инициализировать контекст Eureka
  • Синхронизируйте реестр сервера eureka
  • Включить отслеживание
  • Статистика мониторинга регистрации
protected void initEurekaServerContext() throws Exception {
    // 1、eureka 注册中心配置
    EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();

    // For backward compatibility
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

    logger.info("Initializing the eureka client...");
    logger.info(eurekaServerConfig.getJsonCodecName());
    ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);

    ApplicationInfoManager applicationInfoManager = null;

    if (eurekaClient == null) {
        // 2、eureka 实例配置
        EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
                ? new CloudInstanceConfig()
                : new MyDataCenterInstanceConfig();

        // 3、构造 InstanceInfo 实例信息
        // 4、构造 ApplicationInfoManager 应用管理器
        applicationInfoManager = new ApplicationInfoManager(
                instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

        // 5、eureka 客户端配置
        EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
        // 6、构造 EurekaClient,DiscoveryClient 封装了客户端相关的操作
        eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
    } else {
        applicationInfoManager = eurekaClient.getApplicationInfoManager();
    }

    PeerAwareInstanceRegistry registry;
    if (isAws(applicationInfoManager.getInfo())) {
        registry = new AwsInstanceRegistry(
                eurekaServerConfig,
                eurekaClient.getEurekaClientConfig(),
                serverCodecs,
                eurekaClient
        );
        awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
        awsBinder.start();
    } else {
        // 7、构造感知eureka集群的注册表
        registry = new PeerAwareInstanceRegistryImpl(
                eurekaServerConfig,
                eurekaClient.getEurekaClientConfig(),
                serverCodecs,
                eurekaClient
        );
    }

    // 8、构造eureka-server集群信息
    PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
            registry,
            eurekaServerConfig,
            eurekaClient.getEurekaClientConfig(),
            serverCodecs,
            applicationInfoManager
    );

    // 9、基于前面构造的对象创建 EurekaServerContext
    serverContext = new DefaultEurekaServerContext(
            eurekaServerConfig,
            serverCodecs,
            registry,
            peerEurekaNodes,
            applicationInfoManager
    );

    // 将 serverContext 放到 EurekaServerContextHolder 上下文中,
    // 这样其它地方都可以通过 EurekaServerContextHolder 拿到 EurekaServerContext
    EurekaServerContextHolder.initialize(serverContext);

    // 10、初始化eureka-server上下文
    serverContext.initialize();
    logger.info("Initialized server context");

    // 11、从相邻的eureka-server同步注册表
    int registryCount = registry.syncUp();
    ///12、启动注册表,启动一些定时任务
    registry.openForTraffic(applicationInfoManager, registryCount);

    ///13、注册监控统计
    EurekaMonitors.registerAllStats();
}

Интерфейсно-ориентированное чтение конфигурации

В инициализации есть три интерфейса конфигурации,EurekaServerConfig、EurekaInstanceConfig、EurekaClientConfig, которые соответствуют получению конфигурации реестра, экземпляра eureka и клиента eureka соответственно.

Как видно из конструкторов их классов реализации по умолчанию, EurekaServerConfig читаетсяeureka-server.propertiesконфигурационный файл, именованный префиксeureka.server; EurekaInstanceConfig, EurekaClientConfig читаютсяeureka-client.propertiesФайлы конфигурации, именованные префиксыeureka.instance,eureka.client.

Здесь видно, что способ, которым eureka получает конфигурацию в коде, получается в виде методов интерфейса, а кодировка конфигурации и значения по умолчанию определяются хардкодингом в ее классе реализации по умолчанию. Этот метод чтения конфигурации на основе интерфейса можно использовать для справки.Этот метод чтения конфигурации легче поддерживать, и нет необходимости поддерживать кучу констант.Если код конфигурации изменится, вам нужно только изменить класс реализации.

Например, следующая конфигурация:

@Override
public int getExpectedClientRenewalIntervalSeconds() {
    final int configured = configInstance.getIntProperty(
            namespace + "expectedClientRenewalIntervalSeconds",
            30).get();
    return configured > 0 ? configured : 30;
}

@Override
public double getRenewalPercentThreshold() {
    return configInstance.getDoubleProperty(
            namespace + "renewalPercentThreshold", 0.85).get();
}

@Override
public boolean shouldEnableReplicatedRequestCompression() {
    return configInstance.getBooleanProperty(
            namespace + "enableReplicatedRequestCompression", false).get();
}

Создание экземпляра службы на основе шаблона построителя

Смотретьnew EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()Этот код завершает построение информации об экземпляре службы в методе get. Здесь в основном используется шаблон проектирования Builder для созданияLeaseInfoа такжеInstanceInfo, возьмем в качестве примера InstanceInfo, он имеет статическийBuilderкласс, черезnewBuilder()Метод создает объект InstanceInfo, а затем вы можете вызвать метод установки свойств Builder для установки свойств.При установке этих свойств будут выполнены некоторые проверки корреляции.После завершения установки будет выполнен вызов.build()Метод возвращает объект, и вы также можете выполнить окончательную проверку в методе сборки. Шаблон Builder очень подходит для создания таких сложных объектов.

public synchronized InstanceInfo get() {
    if (instanceInfo == null) {
        // 续约信息:主要有续约间隔时间(默认30秒)和续约过期时间(默认90秒)
        LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

        if (vipAddressResolver == null) {
            vipAddressResolver = new Archaius1VipAddressResolver();
        }

        // 基于建造者模式来创建 InstanceInfo
        InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);

        // set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname
        String instanceId = config.getInstanceId();
        //....

        String defaultAddress;
        if (config instanceof RefreshableInstanceConfig) {
            // Refresh AWS data center info, and return up to date address
            defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false);
        } else {
            defaultAddress = config.getHostName(false);
        }

        // fail safe
        if (defaultAddress == null || defaultAddress.isEmpty()) {
            defaultAddress = config.getIpAddress();
        }

        // 设置属性
        builder.setNamespace(config.getNamespace())
                .setInstanceId(instanceId)
                .setAppName(config.getAppname())
                .setAppGroupName(config.getAppGroupName())
                .setDataCenterInfo(config.getDataCenterInfo())
                .setIPAddr(config.getIpAddress())
                .setHostName(defaultAddress)
                .setPort(config.getNonSecurePort())
                //...
                ;

        builder.setStatus(initialStatus);
        // ...
        // 调用 build 方法做属性校验并创建 InstanceInfo 实例
        instanceInfo = builder.build();
        instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
    }
    return instanceInfo;
}

LeaseInfo 就是续约信息, вы можете видеть, что две основные конфигурации — это интервал обновления и время, в течение которого экземпляр не обновлялся, и экземпляр будет отклонен, если срок его действия истечет. Затем InstanceInfo устанавливается на основе конфигурации, которая представляет собой информацию об экземпляре, включая идентификатор экземпляра, имя хоста, порт, LeaseInfo и т. д.

В реестре создается клиент DiscoveryClient.

В кластерном режиме сервер eureka также регистрируется в других реестрах в качестве клиента, но в это время он сам является клиентом eureka. так будет строитьEurekaClient, чей класс реализации по умолчаниюDiscoveryClient. DiscoveryClient содержит большинство основных функций клиента eureka, таких как регистрация службы, обновление, поддержание пульса, извлечение реестра и т. д.

Открывать шаг за шагом в самый сложный метод строительства, мы сначала проанализировали все это, что было сделано, и столкнутся, многие детали конкретного анализа функций и других компонентов, когда повторно выполняется.

  • Сохраняйте EurekaClientConfig, EurekaInstanceConfig, EurekaTransportConfig, InstanceInfo, ApplicationInfoManager и т. д. в локальные переменные.
  • Если вы хотите получить реестр, создайте счетчик состояния реестра.
  • Если вы хотите зарегистрироваться в реестре, создайте измеритель состояния сердцебиения.
  • Если вы не получите реестр и не зарегистрируетесь в реестре, планировщик и пул потоков пульса не будут созданы, а некоторые ресурсы будут освобождены.
  • Если вы хотите зарегистрироваться в реестре и получить доступ к реестру, инициализируйте некоторые запланированные ресурсы:
    • Создается пул потоков, который поддерживает планирование.Основных потоков два.Как видно со спины, основная задача — обработка пульса и обновление кеша.
    • Создан пул потоков для поддержания пульса, количество основных потоков равно 1, а максимальное количество потоков настроено по умолчанию на 5.
    • Создается пул потоков для обновления кеша, количество основных потоков равно 1, а максимальное количество потоков настроено по умолчанию на 5.
    • Созданы компоненты для сетевого взаимодействия между клиентом eureka и сервером eureka.EurekaTransport, и выполните некоторую инициализацию. Клиент в EurekaTransport в основном инкапсулирует интерфейс вызова API к серверу, который легко вызвать
    • Затем, если вы хотите захватить реестр, вы захватите реестр.Вы можете видеть в fetchRegistry, что он разделен на полный захват и инкрементный захват.При первом запуске он захватит весь объем реестра. .
  • Начните инициализировать задачу планирования:
    • Если вы хотите захватить реестр, создайте задачу для обновления кеша и запуска планирования.По умолчанию реестр захватывается каждые 30 секунд.
    • Если вы хотите зарегистрироваться в реестре, создайте задачу для отправки пульсаций и начните планирование.По умолчанию пульсации отправляются каждые 30 секунд.
    • Если вы хотите зарегистрироваться в реестре, также будет создан распространитель копии экземпляра (который также является запланированной внутренней задачей) и прослушиватель изменений состояния экземпляра.
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    //...
    // 将实例信息、配置信息保存到本地
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }

    // 设置 Applications
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);
    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    // 从远程拉取注册表的地址数组,使用的原子类,在运行中可能会动态更新地址
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    // 如果要获取注册表,就会注册状态监视器
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    // 如果要注册到 eureka-server,就会创建心跳状态监视器
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

    // 如果不注册到注册中心,且不拉取注册表,就不创建调度器、线程池等资源了
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

		//....
        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // 创建定时调度器,默认有2个核心线程,主要处理心跳任务和缓存刷新任务
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());

        // 维持心跳的线程池,一个核心线程,最大线程数默认5。
        // 注意其使用的队列是 SynchronousQueue 队列,这个队列只能放一个任务,一个线程将任务取走后,才能放入下一个任务,否则只能阻塞。
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()
        );  // use direct handoff

        // 刷新缓存的线程池,一个核心线程,最大线程数据默认为5
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()
        );  // use direct handoff

        // eureka http 调用客户端,支持 eureka client 与 eureka server 之间的通信
        eurekaTransport = new EurekaTransport();
        // 初始化 eurekaTransport
        scheduleServerEndpointTask(eurekaTransport, args);

        //...
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    if (clientConfig.shouldFetchRegistry()) {
        try {
            // 拉取注册表:全量抓取和增量抓取
            boolean primaryFetchRegistryResult = fetchRegistry(false);
            if (!primaryFetchRegistryResult) {
                logger.info("Initial registry fetch from primary servers failed");
            }
            //...
        } catch (Throwable th) {
            logger.error("Fetch registry error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    // 初始化一些调度任务:刷新缓存的调度任务、发送心跳的调度任务、实例副本传播器
    initScheduledTasks();

    //...
}

Инициализируйте запланированную задачу:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // 抓取注册表的间隔时间,默认30秒
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        // 刷新缓存调度器延迟时间扩大倍数,在任务超时的时候,将扩大延迟时间
        // 这在出现网络抖动、eureka-sever 不可用时,可以避免频繁发起无效的调度
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        // 注册表刷新的定时任务
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh", scheduler, cacheRefreshExecutor,
                registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound,
                new CacheRefreshThread() // 刷新注册表的任务
        );
        // 30秒后开始调度刷新注册表的任务
        scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        // 续约间隔时间,默认30秒
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        // 心跳调度器的延迟时间扩大倍数,默认10
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // 心跳的定时任务
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs,
                TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread()
        );
        // 30秒后开始调度心跳的任务
        scheduler.schedule(heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS);

        // 实例副本传播器,用于定时更新自己状态
        instanceInfoReplicator = new InstanceInfoReplicator(this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);

        // 实例状态变更的监听器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
            	//...
                // 通知重新注册实例
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        // 向 ApplicationInfoManager 注册监听器
        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        // 启动副本传播器,默认延迟时间40秒
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

Дизайн диспетчера запланированных задач

Видно, что клиент eureka использует временные задачи и планировщики для регулярной отправки пульсаций и захвата реестра.Я думаю, что идея дизайна временного планирования здесь может быть использована для справки.

Возьмите этот код для задачи сердцебиения в качестве примера:

if (clientConfig.shouldFetchRegistry()) {
    // 抓取注册表的间隔时间,默认30秒
    int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
    // 刷新缓存调度器延迟时间扩大倍数,在任务超时的时候,将扩大延迟时间
    // 这在出现网络抖动、eureka-sever 不可用时,可以避免频繁发起无效的调度
    int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
    // 注册表刷新的定时任务
    cacheRefreshTask = new TimedSupervisorTask(
            "cacheRefresh", scheduler, cacheRefreshExecutor,
            registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound,
            new CacheRefreshThread() // 刷新注册表的任务
    );
    // 30秒后开始调度刷新注册表的任务
    scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

Вышеприведенный код на самом деле не сложный, главное создать задачу на время, а затем с помощью планировщика запустить планирование после определенной задержки. Но он не использует планировщик напрямую для планирования задач здесь (CacheRefreshThread), а также не запланировано с фиксированной частотой (каждые 30 секунд). Он определяет руководителя для задачиTimedSupervisorTask, при создании этого супервизора передаются такие параметры, как планировщик, задача, которая должна быть выполнена, и интервал времени, а затем планировщик планирует TimedSupervisorTask.

Глядя на метод построения TimedSupervisorTask, можно выделить следующие моменты:

  • Время ожидания задачи равно времени интервала, которое по умолчанию составляет 30 секунд, а затем время задержки по умолчанию равно времени ожидания.Если сервер eureka не работает или возникла проблема с сетью, может быть тайм-аут
  • Установлено максимальное время задержки, и по умолчанию оно в 10 раз больше, чем время ожидания, то есть 300 секунд.
  • Наконец, были созданы некоторые счетчики для подсчета количества успехов, тайм-аутов, отказов и исключений.Вы можете видеть, что у него есть статистика по планированию задач здесь.
public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                           int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
    this.name = name;
    this.scheduler = scheduler;
    this.executor = executor;
    // 任务超时时间就等于任务调度的间隔时间
    this.timeoutMillis = timeUnit.toMillis(timeout);
    this.task = task;
    // 延迟时间默认为超时时间
    this.delay = new AtomicLong(timeoutMillis);
    // 最大延迟时间,默认在超时时间的基础上扩大10倍
    this.maxDelay = timeoutMillis * expBackOffBound;

    // 初始化计数器并注册
    successCounter = Monitors.newCounter("success");
    timeoutCounter = Monitors.newCounter("timeouts");
    rejectedCounter = Monitors.newCounter("rejectedExecutions");
    throwableCounter = Monitors.newCounter("throwables");
    threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
    Monitors.registerObject(name, this);
}

Посмотрите еще раз на метод запуска TimedSupervisorTask:

  • 1) Сначала отправьте задачу в пул потоков для асинхронного выполнения.Он не запускает задачу напрямую, а асинхронно отправляет ее в пул потоков, что может обеспечить ожидание тайм-аута, не затрагивая основную задачу.
  • 2. Затем первый тайм-аут будет запланирован через 300 секунд. Если сервер Eureka доступен в течение 300 секунд, и новый экземпляр обслуживания зарегистрирован, клиент не сможет воспринимать его вовремя, поэтому я думаю, что параметр, соответствующий getCacherefreshexecutorexponnizergebackoffbound, может быть надлежащим образом установлен меньше (по умолчанию 10 раз).
  • 3) Если время ожидания задачи не истекло, после успешного планирования время задержки будет сброшено до времени ожидания по умолчанию. Наконец, следующее планирование выполняется в finally.
public void run() {
    Future<?> future = null;
    try {
        // 提交任务到线程池
        future = executor.submit(task);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        // 阻塞直到任务完成或超时
        future.get(timeoutMillis, TimeUnit.MILLISECONDS);
        // 任务完成后,重置延迟时间为超时时间,即30秒
        delay.set(timeoutMillis);
        threadPoolLevelGauge.set((long) executor.getActiveCount());
        // 成功次数+1
        successCounter.increment();
    } catch (TimeoutException e) {
        logger.warn("task supervisor timed out", e);
        // 超时次数+1
        timeoutCounter.increment();

        // 如果任务超时了,就会增大延迟时间,当前延迟时间*2,然后取一个最大值
        long currentDelay = delay.get();
        long newDelay = Math.min(maxDelay, currentDelay * 2);
        // 设置为最大的一个延迟时间
        delay.compareAndSet(currentDelay, newDelay);

    } catch (RejectedExecutionException e) {
        //...
        rejectedCounter.increment();
    } catch (Throwable e) {
        //...
        throwableCounter.increment();
    } finally {
        if (future != null) {
            future.cancel(true);
        }
        if (!scheduler.isShutdown()) {
            // 延迟 delay 时间后,继续调度任务
            scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
        }
    }
}

Подводя итог этому дизайну:

  • 1) Прежде всего, при выполнении удаленного вызова учитывайте, что сеть недоступна, серверная часть не работает и т. д., что приводит к тайм-ауту вызова.Вы можете использовать пул потоков для асинхронной отправки задач для реализации ожидания механизм тайм-аута.
  • 2) После тайм-аута можно предположить, что сервису может потребоваться определенное время для восстановления доступного состояния.Если он по-прежнему запланирован на исходный интервал времени, он все еще может истечь по тайм-ауту, поэтому время задержки увеличивается. Если вызов успешен, что указывает на то, что он восстановился, сбросьте время задержки.
  • 3) Планирование задач по времени циклически планируется с определенным временем задержки (schedule), время задержки может быть изменено в соответствии с реальной ситуацией, а не запланировано с фиксированной частотой с самого начала (scheduleAtFixedRate).
  • 4) Для задач с синхронизацией и задач в пуле потоков лучше всего хорошо поработать со статистикой состояния выполнения задач, чтобы наблюдать за планированием задач.

Создание реестра

Затем построитьPeerAwareInstanceRegistry, с точки зрения именования, это реестр, который может воспринимать кластер eureka, то есть в режиме кластера сервер eureka подтягивает реестр с других узлов сервера. Его класс реализации по умолчаниюPeerAwareInstanceRegistryImpl, унаследовано отAbstractInstanceRegistry, который является реестром экземпляра.

1. Создайте PeerAwareInstanceRegistry

Введите конструктор PeerAwareInstanceRegistryImpl:

  • Первый — создать PeerAwareInstanceRegistry, передав в конструктор ранее созданные EurekaServerConfig, EurekaClientConfig, EurekaClient и т. д.
  • Вызывается конструктор super, который в основном инициализирует следующие вещи:
    • Круговая очередь, содержащая недавно отключенные экземпляры
    • Круговая очередь, содержащая недавно зарегистрированные экземпляры
    • Счетчик количества обновлений за последнюю минуту
    • Запланированные задачи по удалению экземпляров из недавно измененной очереди
  • Затем создается счетчик numberOfReplicationsLastMin для количества синхронизаций кластера за последнюю минуту. MeasuredRate Мы проанализируем его структуру позже.
public PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig,
        ServerCodecs serverCodecs, EurekaClient eurekaClient) {
    super(serverConfig, clientConfig, serverCodecs);
    this.eurekaClient = eurekaClient;
    // 最近一分钟集群同步的次数计数器
    this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
    // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
    // then we check the status of a potentially existing lease.
    this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
            new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}

///////////////////////////////////////////////

protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
    this.serverConfig = serverConfig;
    this.clientConfig = clientConfig;
    this.serverCodecs = serverCodecs;
    // 最近下线的循环队列
    this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
    // 最近注册的循环队列
    this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

    // 最近一分钟续约的计数器
    this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
    // 一个定时调度任务,定时剔除最近改变队列中过期的实例
    this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
            serverConfig.getDeltaRetentionTimerIntervalInMs(),
            serverConfig.getDeltaRetentionTimerIntervalInMs());
}

Конкретные детали этой части будут видны позже, когда будут проанализированы конкретные функции.Нам сначала нужно знать, что есть эти очереди и счетчики.

2. Круговая очередьCircularQueueдизайн

Как видно из метода построения, он использует циклическую очередь для сохранения информации о недавно отключенных и недавно зарегистрированных экземплярах, а емкость фиксирована на уровне 1000, так что количество последних экземпляров контролируется в пределах 1000.

CircularQueue — это настраиваемая циклическая очередь, унаследованная отAbstractQueue. На самом деле это агент.ArrayBlockingQueue, затем переопределение поставленного в очередьofferметод, когда очередь заполнена, он берет элемент из головы и помещает его в конец очереди.

static class CircularQueue<E> extends AbstractQueue<E> {
    private final ArrayBlockingQueue<E> delegate;
    private final int capacity;

    public CircularQueue(int capacity) {
        this.capacity = capacity;
        this.delegate = new ArrayBlockingQueue<>(capacity);
    }

    @Override
    public Iterator<E> iterator() {
        return delegate.iterator();
    }

    @Override
    public int size() {
        return delegate.size();
    }

    @Override
    public boolean offer(E e) {
        // 如果队列满了,就取出头部的一个元素,然后再放到尾部
        while (!delegate.offer(e)) {
            delegate.poll();
        }
        return true;
    }

    @Override
    public E poll() {
        return delegate.poll();
    }

    @Override
    public E peek() {
        return delegate.peek();
    }
}

Создайте контекст сервера Eureka и инициализируйте

Далее нужно создатьPeerEurekaNodes, который должен представлять кластер eureka. Затем создайте контекст сервера eureka EurekaServerContext на основе чего-то, созданного ранее, изDefaultEurekaServerContextВы можете увидеть метод построения, просто инкапсулируйте ранее созданные вещи для глобального использования. затем поместите serverContext вEurekaServerContextHolder, чтобы другие места могли получить serverContext через этот держатель.

Далее необходимо инициализировать контекст сервера eureka:

  • Запустите кластер eureka:
    • В основном для запуска запланированной задачи (интервал по умолчанию составляет 10 минут) для обновления информации об узлах кластера eureka и обновления в соответствии с настроенным адресом сервера eureka.PeerEurekaNode, так что, когда сервер eureka находится в автономном или онлайновом режиме, другие серверные узлы могут быть обнаружены вовремя. PeerEurekaNode в основном используется для синхронизации данных между узлами кластера, что будет подробно проанализировано позже при анализе кластера.
  • Инициализация реестра:
    • Сначала запустите созданный ранее счетчик:numberOfReplicationsLastMin
    • Инициализируйте кеш ответов, сервер eureka создает многоуровневый кеш для ответа на запрос клиента на обход реестра. Дизайн этого многоуровневого кеша является ядром ответа на частые запросы к реестру. Позже мы проанализируем реестр обхода клиента. , подробный анализ, когда
    • Регулярно планируйте задачи по обновлению порога продления, в основном для обновленияnumberOfRenewsPerMinThresholdЭто значение, которое представляет собой количество обновлений в минуту, будет анализироваться при анализе продления.
    • инициализацияRemoteRegionRegistry,догадка связана с развертыванием нескольких регионов(региона) эврики
public void initialize() {
    logger.info("Initializing ...");
    // 启动eureka集群
    peerEurekaNodes.start();
    try {
        // 注册表初始化
        registry.init(peerEurekaNodes);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    logger.info("Initialized");
}

Метод запуска PeerEurekaNodes:

public void start() {
    // 单个线程的线程池
    taskExecutor = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                    thread.setDaemon(true);
                    return thread;
                }
            }
    );
    try {
        // 根据集群地址更新 PeerEurekaNode,PeerEurekaNode 就包含了调度其它注册中心的客户端
        updatePeerEurekaNodes(resolvePeerUrls());
        Runnable peersUpdateTask = new Runnable() {
            @Override
            public void run() {
                try {
                    updatePeerEurekaNodes(resolvePeerUrls());
                } catch (Throwable e) {
                    logger.error("Cannot update the replica Nodes", e);
                }

            }
        };
        // 定时跟新集群信息 PeerEurekaNode,如果有eureka-server不可用了,就可以及时下线,或者新上线了eureka-server,可以及时感知到
        taskExecutor.scheduleWithFixedDelay(peersUpdateTask,
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                TimeUnit.MILLISECONDS
        );
    } catch (Exception e) {
        throw new IllegalStateException(e);
    }
    for (PeerEurekaNode node : peerEurekaNodes) {
        logger.info("Replica node URL:  {}", node.getServiceUrl());
    }
}

PeerAwareInstanceRegistryImpl изinitметод:

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
    // 启动计数器
    this.numberOfReplicationsLastMin.start();
    this.peerEurekaNodes = peerEurekaNodes;
    // 初始化响应缓存,eureka server 构造了一个多级缓存来响应客户端抓取注册表的请求
    initializedResponseCache();
    // 定时调度任务更新续约阀值,主要就是更新 numberOfRenewsPerMinThreshold 这个值,即每分钟续约次数
    scheduleRenewalThresholdUpdateTask();
    // 初始化 RemoteRegionRegistry
    initRemoteRegionRegistry();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {
        logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
    }
}

Завершите инициализацию сервера Eureka.

Затем посмотрите на последние несколько шагов:

  • первый звонокregistry.syncUp()Синхронизируйте локальный экземпляр EurekaClient с реестром.В кластерном режиме сервер eureka также является клиентом, поэтому реестр других реестров будет синхронизирован с реестром текущего сервера. По умолчанию он будет повторяться 5 раз с интервалом в 30 секунд. В автономном режиме количество повторных попыток должно быть установлено равным 0.
  • тогда позвониregistry.openForTrafficВыполните окончательную инициализацию:
    • Обновите порог продления в минуту
    • установить состояние экземпляра
    • Запустите счетчик, который подсчитывает количество обновлений за последнюю минуту.
    • Запустите запланированное задание, чтобы исключить автономные экземпляры. Запланированное задание по умолчанию выполняется каждые 60 секунд.
  • Последний шаг — зарегистрировать некоторую статистику мониторинга самой eureka.

syncUpметод:

public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;

    // 注册表同步重试次数,默认5次
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                // 同步重试时间,默认30秒
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        // 注册实例
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

Метод openForTraffic:

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // 期望的客户端每分钟的续约次数
    this.expectedNumberOfClientsSendingRenews = count;
    // 更新每分钟续约阀值
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 设置实例状态为已启动
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

///////////////////////////////////////

protected void postInit() {
    // 启动 统计最近一分钟续约次数的计数器
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    // 定时剔除任务
    evictionTaskRef.set(new EvictionTask());
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

Блок-схема запуска сервера Eureka

На следующем рисунке показан процесс инициализации запуска сервера eureka.

Eureka Client запускает инициализацию

Для начальной инициализации клиента eureka рассмотрим модуль eureka-examples.ExampleEurekaClientЭтот класс в своем основном методе имитирует запуск инициализации в качестве клиента eureka и отправляет запрос в реестр.

Инициализация сервера eureka фактически включает в себя инициализацию клиента.Видно, что инициализация клиента в основном включает в себя следующие вещи:

  • читатьeureka-client.propertiesФайл конфигурации, создайте EurekaInstanceConfig
  • Создание информации об экземпляре InstanceInfo на основе InstanceConfig
  • Создайте диспетчер экземпляров приложений ApplicationInfoManager на основе InstanceConfig и InstanceInfo.
  • читатьeureka-client.propertiesконфигурационный файл, создайтеEurekaClientConfig
  • Создайте EurekaClient (DiscoveryClient) на основе диспетчера экземпляров приложения и clientConfig, процесс инициализации аналогичен созданию DiscoveryClient в процессе инициализации сервера eureka.
public static void main(String[] args) {
    ExampleEurekaClient sampleClient = new ExampleEurekaClient();

    // 基于实例配置和实例信息创建应用实例管理器
    ApplicationInfoManager applicationInfoManager = initializeApplicationInfoManager(new MyDataCenterInstanceConfig());
    // 基于应用实例管理器和客户端配置创建 EurekaClient(DiscoveryClient)
    EurekaClient client = initializeEurekaClient(applicationInfoManager, new DefaultEurekaClientConfig());

    // use the client
    sampleClient.sendRequestToServiceUsingEureka(client);

    // shutdown the client
    eurekaClient.shutdown();
}