Статья была впервые опубликована в публичном аккаунте WeChat «Программист Гогуо».
Введение
Eureka — это служба на основе REST (передача репрезентативного состояния), которая в основном используется в облаке AWS для поиска служб для балансировки нагрузки и аварийного переключения серверов среднего уровня. Мы называем этот сервис Eureka Server. Eureka также поставляется с клиентским компонентом на основе Java, Eureka Client, который значительно упрощает взаимодействие со службами. Клиент также имеет встроенный балансировщик нагрузки, который выполняет базовую циклическую балансировку нагрузки. В Netflix более сложный балансировщик нагрузки, включающий Eureka, обеспечивает взвешенную балансировку нагрузки на основе различных факторов, таких как трафик, использование ресурсов, ошибки и т. д., для обеспечения превосходной отказоустойчивости. Давайте сначала посмотрим на архитектурную схему Netflix Eureka на github, как показано ниже:
Из рисунка видно, что в этой системе есть две роли, а именно Eureka Server и Eureka Client. Клиент Eureka разделен на Службу приложений и Клиент приложений, то есть на поставщиков услуг и потребителей услуг. В каждом регионе есть один кластер Eureka, и по крайней мере один сервер eureka в каждом регионе может обработать сбой региона в случае выхода сервера из строя.
Eureka ClientсуществуетEureka Serverзарегистрируйтесь, затемEureka Clientкаждые 30 секунд доEureka ServerОтправьте пульс, чтобы продлить аренду. еслиEureka ClientНевозможно продлить аренду несколько раз, примерно через 90 секунд.Eureka ServerУдалите его из реестра сервера. Информация о регистрации и продлениях реплицируется на всеEureka Serverузел. Клиенты из любого региона могут искать информацию в реестре (происходит каждые 30 секунд). На основе этой информации в реестре Application Client может удаленно вызывать службу приложений для использования службы.
Анализ исходного кода
Клиент Eureka, предоставляемый NetFlix, можно использовать, добавив аннотацию @EnableDiscoveryClient в класс запуска на стороне клиента eureka на основе Spring Cloud. Давайте используем @EnableDiscoveryClient в качестве записи для анализа исходного кода Eureka Client.
@EnableDiscoveryClient, по исходному коду можно узнать, что это аннотация разметки:
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
boolean autoRegister() default true;
}
Через аннотацию вы можете узнать, что аннотация @EnableDiscoveryClient используется для включения реализации DiscoveryClient.Код интерфейса DiscoveryClient выглядит следующим образом:
public interface DiscoveryClient {
String description();
List<ServiceInstance> getInstances(String serviceId);
List<String> getServices();
}
Описание интерфейса:
- description(): Описание реализации.
- getInstances (String serviceId): получить все экземпляры ServiceInstances, связанные с определенным идентификатором службы.
- getServices(): возвращает все известные идентификаторы служб.
Структурная схема реализации интерфейса DiscoveryClient:
- EurekaDiscoveryClient: класс реализации Eureka DiscoveryClient.
- CompositeDiscoveryClient: порядок клиентов обнаружения, используемый для сортировки доступных клиентов.
- NoopDiscoveryClient: класс реализации обнаружения службы, который ничего не делает, объявлен устаревшим.
- SimpleDiscoveryClient: класс реализации простого обнаружения службы SimpleDiscoveryClient, конкретный экземпляр службы получается из конфигурации SimpleDiscoveryProperties.
EurekaDiscoveryClient — это реализация Eureka интерфейса DiscoveryClient Код выглядит следующим образом:
public class EurekaDiscoveryClient implements DiscoveryClient {
public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";
private final EurekaInstanceConfig config;
private final EurekaClient eurekaClient;
public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {
this.config = config;
this.eurekaClient = eurekaClient;
}
@Override
public String description() {
return DESCRIPTION;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
@Override
public List<String> getServices() {
Applications applications = this.eurekaClient.getApplications();
if (applications == null) {
return Collections.emptyList();
}
List<Application> registered = applications.getRegisteredApplications();
List<String> names = new ArrayList<>();
for (Application app : registered) {
if (app.getInstances().isEmpty()) {
continue;
}
names.add(app.getName().toLowerCase());
}
return names;
}
}
Из кода видно, что EurekaDiscoveryClient реализует стандартный интерфейс, определенный DiscoveryClient.Настоящей реализацией службы обнаружения является EurekaClient.Ниже представлена схема структуры зависимостей EurekaClient:
Единственным классом реализации EurekaClient является DiscoveryClient Метод построения DiscoveryClient выглядит следующим образом:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//省略...
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//省略...
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
//省略...
}
Вы можете видеть, что этот метод построения в основном делает следующие вещи:
- Создан пул потоков запланированных задач планировщика, пул потоков проверки пульса heartbeatExecutor (Продление услуги), cacheRefreshExecutor(Приобретение услуг)
- Затем initScheduledTasks() открывает указанные выше три пула потоков и добавляет соответствующие задачи в указанные выше три пула потоков соответственно. Затем создайте instanceInfoReplicator (запускаемая задача), затем вызовите метод InstanceInfoReplicator.start и поместите эту задачу в указанный выше пул потоков запланированных задач планировщика (Регистрация и обновление сервиса).
Регистрация службы (Реестр)
Как упоминалось выше, метод InstanceInfoReplicator.start() вызывается в методе initScheduledTasks(), а код метода run() InstanceInfoReplicator выглядит следующим образом:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
Метод запуска InstanceInfoReplicator найден, и метод регистрации DiscoveryClient будет вызываться в методе запуска. Код метода регистрации DiscoveryClient выглядит следующим образом:
/**
* Register with the eureka service by making the appropriate REST call.
*/
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
Наконец, после серии вызовов будет вызван метод регистрации класса AbstractJerseyEurekaHttpClient. Код выглядит следующим образом:
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
Вы можете видеть, что серверная часть eureka, наконец, запрашивается через http rest, а экземпляр самого приложения InstanceInfo регистрируется на стороне сервера.Давайте полностью разберем процесс регистрации службы:
Продлить продление услуги
Продление услуги очень похоже на регистрацию услуги, код HeartbeatThread выглядит следующим образом:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
//更新最后一次心跳的时间
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
// 续约的主方法
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
Отправьте пульс и запросите сервер eureka.Если интерфейс возвращает 404, это означает, что служба не существует, затем снова пройдите процесс регистрации.
Если возвращаемое значение интерфейса равно 404, что означает, что он не существует и никогда не регистрировался, то пройдите процесс регистрации еще раз.
Процесс продления услуги выглядит следующим образом:
Отмена услуги в автономном режиме
Когда служба отключается, необходимо вовремя уведомить сервер, чтобы он удалил себя, чтобы клиент не вызывал автономную службу.Код метода shutdown() выглядит следующим образом:
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
// 关闭各种定时任务
// 关闭刷新实例信息/注册的定时任务
// 关闭续约(心跳)的定时任务
// 关闭获取注册信息的定时任务
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
// 更改实例状态,使实例不再接收流量
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
//向EurekaServer端发送下线请求
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
private void cancelScheduledTasks() {
if (instanceInfoReplicator != null) {
instanceInfoReplicator.stop();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdownNow();
}
if (cacheRefreshExecutor != null) {
cacheRefreshExecutor.shutdownNow();
}
if (scheduler != null) {
scheduler.shutdownNow();
}
}
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
Сначала закройте различные задачи синхронизации, а затем отправьте уведомление об отключении службы на сервер eureka. Процесс работы службы в автономном режиме выглядит следующим образом:
Ссылаться на
GitHub.com/Netflix/Евро… Е Мин Что/2016/12/01/… блог Brother space.com/spring cloud… woo woo Краткое описание.com/afraid/71 ах 8 должны остерегаться 0…
Добро пожаловать, чтобы отсканировать код или выполнить поиск в общедоступной учетной записи «Programmer Guoguo» на WeChat и подписаться на меня, вас ждут сюрпризы~