Дорога обновления SpringCloud 2020.0.x версии 32. Улучшен алгоритм балансировки нагрузки

Java задняя часть Spring Cloud
Дорога обновления SpringCloud 2020.0.x версии 32. Улучшен алгоритм балансировки нагрузки

Это 11-й день моего участия в ноябрьском испытании обновлений. Узнайте подробности события:Вызов последнего обновления 2021 г.

Кодовый адрес этой серии:GitHub.com/Jojo TE C/SPR…

В предыдущем разделе мы разобрали идеи реализации прерывателей цепи Feign и изоляции потоков, в этом разделе мы не будем рассматривать, как реализовать исходный код (поскольку исходный код будет включать в себя улучшенную часть алгоритма балансировки нагрузки) , давайте обсудим, как оптимизировать текущий алгоритм балансировки нагрузки.

До алгоритмов балансировки нагрузки

  1. получить список экземпляров службы,Сортировка списка экземпляров по ip-порту, если он не отсортирован, даже если позиция следующая, он может представлять экземпляр, который был вызван ранее
  2. На основе traceId в запросе, изПолучите позицию атомарной переменной, начальное значение которой является случайным числом с traceId в качестве ключа в локальном кеше., что предотвращает вызов всех запросов из первого экземпляра, а затем из второго и третьего.
  3. Атом позиции увеличивается на единицу, затем количество экземпляров вычитается, и экземпляр, соответствующий индексу, возвращается для вызова.

Запрос содержит traceId, потому что мы использовали трассировку ссылки spring-cloud-sleuth. На основе этого механизма мы можем гарантировать, что запрос не будет повторен для экземпляра, который был вызван ранее. Исходный код:

//一定必须是实现ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因为注册的时候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次请求算上重试不会超过1分钟
    //对于超过1分钟的,这种请求肯定比较重,不应该重试
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            //随机初始值,防止每次都是从第一个开始调用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }
    
    //每次重试,其实都会调用这个 choose 方法重新获取一个实例
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        //为了解决原始算法不同调用并发可能导致一个请求重试相同的实例
        //从 sleuth 的 Tracer 中获取当前请求的上下文
        Span currentSpan = tracer.currentSpan();
        //如果上下文不存在,则可能不是前端用户请求,而是其他某些机制触发,我们就创建一个新的上下文
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        //从请求上下文中获取请求的 traceId,用来唯一标识一个请求
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                //实例返回列表顺序可能不同,为了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

Однако этот алгоритм балансировки нагрузки все равно доставлял нам проблемы, когда было много внезапных запросов.

Прежде всего, это внезапное увеличение, мы не приняли расширение, в результате чегоРабочее давление на этот раз очень чувствительно к сбалансированному распределению давления.. Например, предположим, что есть 9 экземпляров микросервиса А, когда наступает пик бизнеса,Идеальная ситуация состоит в том, чтобы гарантировать, что 9 давлений нагрузки всегда полностью сбалансированы.Но поскольку мы используем атомную переменную position, начальное значение случайных чисел, хотя общее количество с дневной точки зрения, отвечает за балансировку давления, безусловно, уравновешивается, но в течение короткого периода времени, вероятно,Давление все пошло на несколько случаев, в результате чего эти экземпляры были завалены и взорваны, а потом все они побежали к нескольким другим экземплярам, ​​которые были завалены и взорваны снова, такой замкнутый круг.

Затем мы развертываем развертывание k8s, и на одной виртуальной машине может быть много модулей микросервисов. В некоторых случаях несколько модулей одного и того же микросервиса могут работать на одном и том же узле виртуальной машины. Это можно сделать изЭто видно из сетевого сегмента ip модуля.: например, микросервис имеет следующие 7 экземпляров: 10.238.13.12:8181, 10.238.13.24:8181, 10.238.15.12:8181, 10.238.17.12:8181, 10.238.20.220:8181, 10.238.21.10:8. :8181, то 10.238.13.12:8181 и 10.238.13.24:8181, скорее всего, будут на одном узле, а 10.238.21.31:8181 и 10.238.21.121:8181, скорее всего, будут на одном узле. мы пробуем снова,Требуется приоритет для повторной попытки экземпляра ранее повторенных примеров как можно большеПотому что до тех пор, пока один экземпляр на одном узере имеет проблему или слишком много давления, другие в основном имеют проблемы или слишком много давления.

Наконец, если вызов экземпляра продолжает завершаться ошибкой, приоритет вызова этого экземпляра должен быть отнесен к другим обычным экземплярам. Это уменьшит влияние на пользователей быстрого обновления и выпуска (запуск многих экземпляров одновременно, а затем остановка нескольких старых экземпляров, количество экземпляров больше, чем количество повторных попыток) на пользователей, а также внезапное исключение зоны доступности, которое вызывает несколько экземпляров для отключения для пользователей Влияние сети прошло, и бизнес-давление прошло.После того, как давление уменьшится, экземпляры, которые больше не нужны, должны быть закрыты, что окажет большое влияние на пользователей, когда перенесено большое количество экземпляров.

Схема оптимизации для вышеуказанных задач

Мы предлагаем оптимизированное решение трех вышеуказанных проблем:

  1. Для каждого запроса запишите:
  2. Какие экземпляры были вызваны этим запросом ->Запрос называется кеш экземпляра
  3. Экземпляр вызова, сколько запросов обрабатывается в данный момент ->Запросы на запуск экземпляра
  4. Экземпляр вызова, частота ошибок последнего запроса ->Частота ошибок запроса экземпляра
  5. Перетасовать список экземпляров случайным образом, чтобы запросы не всегда отправлялись в один и тот же экземпляр, когда все три вышеуказанные метрики одинаковы.
  6. Сортировать в том порядке, в том, что текущий запрос не был назван ранее, -> меньший уровень ошибок, тем более продвинутой -> меньшее количество запросов на экземпляр, тем выше
  7. Возьмите первый экземпляр в отсортированном списке в качестве экземпляра этой балансировки нагрузки.

Конкретная реализация: следующий код исходит из:GitHub.com/Jojo TE C/SPR…

Мы использовали зависимости:

<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
</dependency>

Класс кеша, который записывает данные экземпляра:

@Log4j2
public class ServiceInstanceMetrics {
	private static final String CALLING = "-Calling";
	private static final String FAILED = "-Failed";

	private MetricRegistry metricRegistry;

	ServiceInstanceMetrics() {
	}

	public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
		this.metricRegistry = metricRegistry;
	}

	/**
	 * 记录调用实例
	 * @param serviceInstance
	 */
	public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).inc();
	}
	/**
	 * 记录调用实例结束
	 * @param serviceInstance
	 * @param isSuccess 是否成功
	 */
	public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).dec();
		if (!isSuccess) {
			//不成功则记录失败
			metricRegistry.meter(key + FAILED).mark();
		}
	}

	/**
	 * 获取正在运行的调用次数
	 * @param serviceInstance
	 * @return
	 */
	public long getCalling(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		long count = metricRegistry.counter(key + CALLING).getCount();
		log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
		return count;
	}

	/**
	 * 获取最近一分钟调用失败次数分钟速率,其实是滑动平均数
	 * @param serviceInstance
	 * @return
	 */
	public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
		log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
		return rate;
	}
}

Основной код балансировки нагрузки:

private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
        .expireAfterAccess(3, TimeUnit.MINUTES)
        .build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;

//每次重试,其实都会调用这个 choose 方法重新获取一个实例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
    Span span = tracer.currentSpan();
    return serviceInstanceListSupplier.get().next()
            .map(serviceInstances -> {
                //保持 span 和调用 choose 的 span 一样
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    return getInstanceResponse(serviceInstances);
                }
            });
}


private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    //读取 spring-cloud-sleuth 的对于当前请求的链路追踪上下文,获取对应的 traceId
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    return getInstanceResponseByRoundRobin(l, serviceInstances);
}

@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
    //首先随机打乱列表中实例的顺序
    Collections.shuffle(serviceInstances);
    //需要先将所有参数缓存起来,否则 comparator 会调用多次,并且可能在排序过程中参数发生改变(针对实例的请求统计数据一直在并发改变)
    Map<ServiceInstance, Integer> used = Maps.newHashMap();
    Map<ServiceInstance, Long> callings = Maps.newHashMap();
    Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
    serviceInstances = serviceInstances.stream().sorted(
            Comparator
                    //之前已经调用过的网段,这里排后面
                    .<ServiceInstance>comparingInt(serviceInstance -> {
                        return used.computeIfAbsent(serviceInstance, k -> {
                            return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
                                return serviceInstance.getHost().contains(prefix);
                            }) ? 1 : 0;
                        });
                    })
                    //当前错误率最少的
                    .thenComparingDouble(serviceInstance -> {
                        return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
                            double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
                            //由于使用的是移动平均值(EMA),需要忽略过小的差异(保留两位小数,不是四舍五入,而是直接舍弃)
                            return ((int) (value * 100)) / 100.0;
                        });
                    })
                    //当前负载请求最少的
                    .thenComparingLong(serviceInstance -> {
                        return callings.computeIfAbsent(serviceInstance, k ->
                                serviceInstanceMetrics.getCalling(serviceInstance)
                        );
                    })
    ).collect(Collectors.toList());
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    ServiceInstance serviceInstance = serviceInstances.get(0);
    //记录本次返回的网段
    calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
    //目前记录这个只为了兼容之前的单元测试(调用次数测试)
    positionCache.get(traceId).getAndIncrement();
    return new DefaultResponse(serviceInstance);
}

Когда обновляется кеш для записи данных инстанса?Это связанный повтор FeignClient, отключите и изолируйте поток кода, который мы увидим в следующем разделе.

Несколько групповых вопросов и ответов по выбору схемы

1. Почему бы не использовать кеш, общий для всех микросервисов, для сохранения данных о звонках, чтобы сделать эти данные более точными?

Варианты общего кеша включают размещение этих записей данных в Redis или сетке в памяти, такой как Apache Ignite. Но есть две проблемы:

  1. Если записи данных помещаются в дополнительное хранилище, такое как Redis, вся балансировка нагрузки не может быть выполнена, если Redis недоступен. Если вы поместите его в Apache Ignite, если соответствующий узел отключится, соответствующая балансировка нагрузки не может быть выполнена. Это неприемлемо.
  2. Предполагая, что микросервис представляет собой потребность в Microservice B, может возникнуть проблема с экземпляром вызова экземпляра B, но нет проблем с другими случаями вызывающего этого экземпляра B. Например, когда зона доступности отличается от другой зоны доступности, когда сеть перегружена. Если тот же ключ кэша используется для записи данных всех экземпляров вызова экземпляра B, оно явно неточно.

Каждый микросервис использует локальный кеш, данные о вызовах записываются другими экземплярами, здесь, на наш взгляд, не только проще, но и точнее подход.

2. Используйте метод EMA вместо метода окна запроса для подсчета частоты недавних ошибок.

Метод окна запроса, безусловно, самый точный. Например, если мы посчитаем частоту ошибок в последнюю минуту, мы будем кэшировать запросы в последнюю минуту. При чтении мы будем складывать данные кэшированных запросов вместе, чтобы получить среднее значение. , Вот и все. Но этот метод может занимать много памяти для кэширования этих запросов, когда запрос взрывается. В то же время при расчете коэффициента ошибок по мере увеличения количества запросов к кешу для расчета будет потребляться большее количество ЦП. Это того не стоит.

EMA — это метод расчета скользящего среднего, который обычно используется в различных сценариях мониторинга производительности и статистики, таких как динамический расчет размера TLAB в JVM, масштабирование размера области GC G1 и во многих других местах, где JVM необходимо динамически получать соответствующие значения. метод расчета. Вместо того, чтобы кэшировать запрос, он напрямую умножает последнее значение на коэффициент, а затем добавляет старое значение, умноженное на (1 - это соотношение).Этот коэффициент обычно выше 0,5, что указывает на то, что EMA больше соответствует текущему последнему значению. .

Но EMA также приносит еще одну проблему.Мы обнаружим, что по мере работы программы количество знаков после запятой будет очень большим, и мы увидим значения, подобные следующим: 0,00000000123, 0,120000001, 0,120000003, чтобы игнорировать эффект слишком мелких отличий (на самом деле эти эффекты тоже из давно плохой просьбы), мыСортировка только с двумя десятичными знаками.

Поиск «Мое программирование мяу» на WECHAT, следуйте на официальной учетной записи, чистите каждый день, легко улучшите свою технологию и получить различные предложения

Категории