kube-scheduler для анализа исходного кода Kubernetes

Kubernetes

В этом разделе начинается анализ части исходного кода kubernetes, версия основана на последней версии 1.13.4.

Начать анализ

Точки входа основных компонентов Kubernetes находятся вcmdВ каталоге запись kube-schduler находится вscheduler.goвниз, как показано на рисунке

Все компоненты kubernetes запускаются с помощьюcommandформе со ссылкой наspf13библиотека классов
Преобразовав файл конфигурации вcommandформа, вызовExecuteвыполнение метода определеноRunметод
ВходитьrunCommandметод, завершив инициализацию конфигурации, вызвавRunспособ дальнейшего запуска.

Выполнить анализ метода

Метод Run в основном делает следующее:
1. Определить, нужно ли добавлятьVolumeSchedulingновые возможности;
2. Инициализировать соответствующую структуру параметров планирования;
3. Настроить и подготовить трансляцию мероприятия;
4. Конфигурация, связанная с проверкой работоспособности;
5.Metricsсоответствующая конфигурация;
6. Начать всеInformer(kubernetes в основном черезInformerиWorkqueueмеханизм прослушивания изменений в событиях);
7. Определите, нужно лиLeaderElection, что определяет окончательный запуск.

интерфейс планирования

Окончательный интерфейс планирования входит вpkgвнизscheduler.goфайл, который обрабатывает планирование работы, запуская отдельную сопрограмму.

Анализ метода ScheduleOne

scheduleOne, как следует из названия, планирует один под за раз, и общий файл выглядит следующим образом.

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
	// 1.从队列中取出待调度的Pod
	pod := sched.config.NextPod()
	// pod could be nil when schedulerQueue is closed
	if pod == nil {
		return
	}
	if pod.DeletionTimestamp != nil {
		sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		klog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
		return
	}

	klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

	// Synchronously attempt to find a fit for the pod.
	start := time.Now()
	// 2.获取待调度Pod匹配的主机名
	suggestedHost, err := sched.schedule(pod)
	if err != nil {
		// schedule() may have failed because the pod would not fit on any host, so we try to
		// preempt, with the expectation that the next time the pod is tried for scheduling it
		// will fit due to the preemption. It is also possible that a different pod will schedule
		// into the resources that were preempted, but this is harmless.
		if fitError, ok := err.(*core.FitError); ok {
			preemptionStartTime := time.Now()
			sched.preempt(pod, fitError)
			metrics.PreemptionAttempts.Inc()
			metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
			metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
			// Pod did not fit anywhere, so it is counted as a failure. If preemption
			// succeeds, the pod should get counted as a success the next time we try to
			// schedule it. (hopefully)
			metrics.PodScheduleFailures.Inc()
		} else {
			klog.Errorf("error selecting node for pod: %v", err)
			metrics.PodScheduleErrors.Inc()
		}
		return
	}
	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
	// This allows us to keep scheduling without waiting on binding to occur.
	// 3.Pod与Node缓存,保证调度一直进行,不用等待每次绑定完成(绑定是一个耗时的过程)
	assumedPod := pod.DeepCopy()

	// Assume volumes first before assuming the pod.
	//
	// If all volumes are completely bound, then allBound is true and binding will be skipped.
	//
	// Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
	//
	// This function modifies 'assumedPod' if volume binding is required.
	// 4.判断是否需要VolumeScheduling特性
	allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
	if err != nil {
		klog.Errorf("error assuming volumes: %v", err)
		metrics.PodScheduleErrors.Inc()
		return
	}

	// assume modifies `assumedPod` by setting NodeName=suggestedHost
	// 5.Pod对应的NodeName写上主机名,存入缓存
	err = sched.assume(assumedPod, suggestedHost)
	if err != nil {
		klog.Errorf("error assuming pod: %v", err)
		metrics.PodScheduleErrors.Inc()
		return
	}
	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	// 6.请求apiserver,异步处理最终的绑定,写入到etcd
	go func() {
		// Bind volumes first before Pod
		if !allBound {
			err := sched.bindVolumes(assumedPod)
			if err != nil {
				klog.Errorf("error binding volumes: %v", err)
				metrics.PodScheduleErrors.Inc()
				return
			}
		}

		err := sched.bind(assumedPod, &v1.Binding{
			ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
			Target: v1.ObjectReference{
				Kind: "Node",
				Name: suggestedHost,
			},
		})
		metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
		if err != nil {
			klog.Errorf("error binding pod: %v", err)
			metrics.PodScheduleErrors.Inc()
		} else {
			metrics.PodScheduleSuccesses.Inc()
		}
	}()
}

В основном выполняют следующие работы:
1. Возьмите Pod для планирования из очереди
2. Получите хост, соответствующий поду, который должен быть запланирован в соответствии с алгоритмом планирования (предварительный выбор + предпочтение).Если подходящий хост не получен, определите, нужен ли онpreempt, то есть стратегия вытеснения Pod, назначение узлов Pod
3. Кэшировать текущий под, предполагая, что привязка прошла успешно (главным образом для разделения процесса планирования и привязки).
4. Определите, требуется ли функция VolumeScheduling для продолжения добавления информации о подах.
5. Запишите имя хоста в NodeName, соответствующее Pod (суть планирования состоит в том, чтобы записать соответствующее значение Node для пустого NodeName)
6. Запустите новую сопрограмму привязки, запросите apiserver, обработайте окончательную привязку асинхронно и запишите результат в etcd.

Алгоритм планирования

Окончательное расписание находится наgeneric_scheduler.goизScheduleметод. Планирование в основном делится на два этапа,предварительный отборипредпочтительный.

предварительный отбор

Интерфейс, вызываемый алгоритмом предварительного выбора,findNodesThatFit, основной код выглядит следующим образом:

// Filters the nodes to find the ones that fit based on the given predicate functions
// Each node is passed through the predicate functions to determine if it is a fit
func (g *genericScheduler) findNodesThatFit(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, FailedPredicateMap, error) {
	var filtered []*v1.Node
	failedPredicateMap := FailedPredicateMap{}

	// 该if表示,如果没有配置预选的算法,则直接将所有的Node写入匹配数组
	if len(g.predicates) == 0 {
		filtered = nodes
	} else {
		allNodes := int32(g.cache.NodeTree().NumNodes)
		// numFeasibleNodesToFind保证一次性不用返回过多的Node数量,避免数组过大
		numNodesToFind := g.numFeasibleNodesToFind(allNodes)

		// Create filtered list with enough space to avoid growing it
		// and allow assigning.
		filtered = make([]*v1.Node, numNodesToFind)
		errs := errors.MessageCountMap{}
		var (
			predicateResultLock sync.Mutex
			filteredLen         int32
			equivClass          *equivalence.Class
		)

		ctx, cancel := context.WithCancel(context.Background())

		// We can use the same metadata producer for all nodes.
		meta := g.predicateMetaProducer(pod, g.cachedNodeInfoMap)

		if g.equivalenceCache != nil {
			// getEquivalenceClassInfo will return immediately if no equivalence pod found
			equivClass = equivalence.NewClass(pod)
		}

		// checkNode处理预选策略
		checkNode := func(i int) {
			var nodeCache *equivalence.NodeCache
			// 每次获取Node信息
			nodeName := g.cache.NodeTree().Next()
			if g.equivalenceCache != nil {
				nodeCache = g.equivalenceCache.LoadNodeCache(nodeName)
			}
			fits, failedPredicates, err := podFitsOnNode(
				pod,
				meta,
				g.cachedNodeInfoMap[nodeName],
				g.predicates,
				nodeCache,
				g.schedulingQueue,
				g.alwaysCheckAllPredicates,
				equivClass,
			)
			if err != nil {
				predicateResultLock.Lock()
				errs[err.Error()]++
				predicateResultLock.Unlock()
				return
			}
			if fits {
				// 保证获取的Node数量在numNodesToFind内
				length := atomic.AddInt32(&filteredLen, 1)
				if length > numNodesToFind {
					// 通知ParallelizeUntil任务结束
					cancel()
					atomic.AddInt32(&filteredLen, -1)
				} else {
					filtered[length-1] = g.cachedNodeInfoMap[nodeName].Node()
				}
			} else {
				predicateResultLock.Lock()
				failedPredicateMap[nodeName] = failedPredicates
				predicateResultLock.Unlock()
			}
		}

		// Stops searching for more nodes once the configured number of feasible nodes
		// are found.
		// 并行处理多个Node的checkNode工作
		workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

		filtered = filtered[:filteredLen]
		if len(errs) > 0 {
			return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
		}
	}

	if len(filtered) > 0 && len(g.extenders) != 0 {
		for _, extender := range g.extenders {
			if !extender.IsInterested(pod) {
				continue
			}
			filteredList, failedMap, err := extender.Filter(pod, filtered, g.cachedNodeInfoMap)
			if err != nil {
				if extender.IsIgnorable() {
					klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
						extender, err)
					continue
				} else {
					return []*v1.Node{}, FailedPredicateMap{}, err
				}
			}

			for failedNodeName, failedMsg := range failedMap {
				if _, found := failedPredicateMap[failedNodeName]; !found {
					failedPredicateMap[failedNodeName] = []algorithm.PredicateFailureReason{}
				}
				failedPredicateMap[failedNodeName] = append(failedPredicateMap[failedNodeName], predicates.NewFailureReason(failedMsg))
			}
			filtered = filteredList
			if len(filtered) == 0 {
				break
			}
		}
	}
	return filtered, failedPredicateMap, nil
}

findNodesThatFitВ основном делать несколько операций
1. Определить, настроен ли алгоритм предварительного выбора, если нет, напрямую вернуть информацию о списке узлов;
2. Если настроен алгоритм предварительного выбора, вызывать несколько узлов (до 16 за раз) одновременно.checkNodeметод определения того, можно ли запланировать Pod на узле;
3. После предварительного отбора, если настроен алгоритм расширения расписания, необходимо снова продолжить просмотр отфильтрованных модулей и узлов, чтобы получить окончательный список соответствующих узлов.
Здесь следует отметить, что при получении количества совпадающих узлов Node передайтеnumFeasibleNodesToFindФункция ограничивает количество узлов, получаемых каждый раз, максимальное значение равно100. Таким образом, когда соответствующий номер узла совпадает,checkNodeМетод больше не вызывается.
Я лично чувствую, что здесь есть некоторые проблемы.Когда количество узлов достаточно велико (больше 100), из-заnumFeasibleNodesToFindКоличество Узлов ограничено, поэтому не все Узлы могут быть просканированы, что может привести к тому, что наиболее подходящие Узлы не будут просканированы, и будут сопоставляться только Узлы с более высоким приоритетом, а окончательные запланированные Узлы не будут самыми подходящими Узлами. является более подходящим.
Интерфейс, который, наконец, реализует суждение о планировании:podFitsOnNode.
podFitsOnNodeСамое сложное для понимания то, что цикл for повторяется дважды, по комментариям общий смысл такой:
1. Первый цикл, все приоритеты выше или равныnominatedPodsПрисоединяйтесь к узлу, обновитеmetaиnodeInfo.nominatedPodsОтносится к модулям, которые были выделены для узла, но фактически не запущены. Это может гарантировать, что поды с высоким приоритетом не вызовут сбоев планирования из-за добавления текущих подов;
2. Второе планирование, неnominatedPodsПрисоединяйтесь к узлу. Причина этого в том, что при рассмотрении таких политик, как привязка пода, если текущий под зависит отnominatedPods, будет проблема. так как,nominatedPodsНет гарантии, что его можно запланировать на соответствующий узел.

// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
// ---
// podFitsOnNode根据给定的NodeInfo判断是否匹配相应的预选函数
// 对于一个给定的Pod,podFitsOnNode会检查之前是否有等价的Pod,这样就可以直接复用等价Pod的预选结果
// 该函数会有两个地方调用:Schedule和Preempt
// 当Schedule(正常调度)的时候,判断Node上所有已经存在的Pod和将被指定将要调度到这个Node上的其他所有高优先级Pod外,当前的Pod是否可以调度
// 当Preempt(抢占式)的时候,待定。。。
func podFitsOnNode(
	pod *v1.Pod,
	meta algorithm.PredicateMetadata,
	info *schedulercache.NodeInfo,
	predicateFuncs map[string]algorithm.FitPredicate,
	nodeCache *equivalence.NodeCache,
	queue internalqueue.SchedulingQueue,
	alwaysCheckAllPredicates bool,
	equivClass *equivalence.Class,
) (bool, []algorithm.PredicateFailureReason, error) {
	var (
		eCacheAvailable  bool
		failedPredicates []algorithm.PredicateFailureReason
	)

	podsAdded := false
	// We run predicates twice in some cases. If the node has greater or equal priority
	// nominated pods, we run them when those pods are added to meta and nodeInfo.
	// If all predicates succeed in this pass, we run them again when these
	// nominated pods are not added. This second pass is necessary because some
	// predicates such as inter-pod affinity may not pass without the nominated pods.
	// If there are no nominated pods for the node or if the first run of the
	// predicates fail, we don't run the second pass.
	// We consider only equal or higher priority pods in the first pass, because
	// those are the current "pod" must yield to them and not take a space opened
	// for running them. It is ok if the current "pod" take resources freed for
	// lower priority pods.
	// Requiring that the new pod is schedulable in both circumstances ensures that
	// we are making a conservative decision: predicates like resources and inter-pod
	// anti-affinity are more likely to fail when the nominated pods are treated
	// as running, while predicates like pod affinity are more likely to fail when
	// the nominated pods are treated as not running. We can't just assume the
	// nominated pods are running because they are not running right now and in fact,
	// they may end up getting scheduled to a different node.
	// 两次循环的原因主要就是因为NominatedPods调度的不一定就是此Node,还有Pod的亲和性等问题
	for i := 0; i < 2; i++ {
		metaToUse := meta
		nodeInfoToUse := info
		if i == 0 {
			// 第一次调度,根据NominatedPods更新meta和nodeInfo信息,pod根据更新后的信息去预选
			// 第二次调度,meta和nodeInfo信息不变,保证pod不完全依赖于NominatedPods(主要考虑到pod亲和性之类的)
			podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
		} else if !podsAdded || len(failedPredicates) != 0 {
			break
		}
		// Bypass eCache if node has any nominated pods.
		// TODO(bsalamat): consider using eCache and adding proper eCache invalidations
		// when pods are nominated or their nominations change.
		eCacheAvailable = equivClass != nil && nodeCache != nil && !podsAdded
		for predicateID, predicateKey := range predicates.Ordering() {
			var (
				fit     bool
				reasons []algorithm.PredicateFailureReason
				err     error
			)
			//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
			if predicate, exist := predicateFuncs[predicateKey]; exist {
				if eCacheAvailable {
					fit, reasons, err = nodeCache.RunPredicate(predicate, predicateKey, predicateID, pod, metaToUse, nodeInfoToUse, equivClass)
				} else {
					fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
				}
				if err != nil {
					return false, []algorithm.PredicateFailureReason{}, err
				}

				if !fit {
					// eCache is available and valid, and predicates result is unfit, record the fail reasons
					failedPredicates = append(failedPredicates, reasons...)
					// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
					if !alwaysCheckAllPredicates {
						klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
							"evaluation is short circuited and there are chances " +
							"of other predicates failing as well.")
						break
					}
				}
			}
		}
	}

	return len(failedPredicates) == 0, failedPredicates, nil
}

После этого по заранее выбранному алгоритму планирования один за другим судят, все ли они удовлетворены. Здесь есть небольшая оптимизация: если у текущего пода был аналогичный под, соответствующий предыдущий результат будет возвращен непосредственно из кеша. В случае успеха не продолжайте вызывать алгоритм предварительного выбора. Тем не менее, у меня лично есть некоторые сомнения по поводу части кеша: может быть, кешированный результат предыдущего пода был успешным, но на этот раз информация об узле изменилась, и кешированный результат успешен, но на самом деле он может быть не успешным.

Предварительно выбранный алгоритм планирования

В этом разделе в основном рассказывается об алгоритме планирования по умолчанию. Код по умолчанию находится вpkg/scheduler/algorithmprovider/defaults/defaults.goВниз,defaultPredicatesМетод возвращает набор алгоритмов предварительного выбора по умолчанию. Код, относящийся к предварительному выбору, находится вpkg/scheduler/algorithm/predicates/predicates.goВниз

Для каждого алгоритма планирования существует порядок приоритета,Официальный сайтЕсть подробные описания.
Метод планирования в основном такой же, а параметры(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *schedulercache.NodeInfo), возвращаемое значение равно(bool, []algorithm.PredicateFailureReason, error).

предпочтительный

После завершения предварительного выбора будет получен массив узлов. Если количество предварительно выбранных подходящих узлов больше 1, необходимо вызвать алгоритм оптимизации для получения оптимального узла в соответствии с оценкой.
Интерфейс, вызываемый предпочтительным алгоритмом:PrioritizeNodes, используя метод многозадачного синхронного вызова, аналогичный предварительному выбору, используя идею MapReduce, Map получает значение определенного Node по разным алгоритмам оптимизации, и подсчитывает конечный результат по Reduce.

Предпочтительный алгоритм планирования

Код по умолчанию предпочтительного алгоритма планирования находится вpkg/scheduler/algorithmprovider/defaults/defaults.goВниз,defaultPrioritiesМетод возвращает серию предпочтительных алгоритмов по умолчанию, и соответствующие предпочтительные алгоритмы обрабатываются в фабричном режиме.Код выглядит следующим образом.

func defaultPriorities() sets.String {
	return sets.NewString(
		// spreads pods by minimizing the number of pods (belonging to the same service or replication controller) on the same node.
		factory.RegisterPriorityConfigFactory(
			"SelectorSpreadPriority",
			factory.PriorityConfigFactory{
				MapReduceFunction: func(args factory.PluginFactoryArgs) (algorithm.PriorityMapFunction, algorithm.PriorityReduceFunction) {
					return priorities.NewSelectorSpreadPriority(args.ServiceLister, args.ControllerLister, args.ReplicaSetLister, args.StatefulSetLister)
				},
				Weight: 1,
			},
		),
		// pods should be placed in the same topological domain (e.g. same node, same rack, same zone, same power domain, etc.)
		// as some other pods, or, conversely, should not be placed in the same topological domain as some other pods.
		factory.RegisterPriorityConfigFactory(
			"InterPodAffinityPriority",
			factory.PriorityConfigFactory{
				Function: func(args factory.PluginFactoryArgs) algorithm.PriorityFunction {
					return priorities.NewInterPodAffinityPriority(args.NodeInfo, args.NodeLister, args.PodLister, args.HardPodAffinitySymmetricWeight)
				},
				Weight: 1,
			},
		),

		// Prioritize nodes by least requested utilization.
		factory.RegisterPriorityFunction2("LeastRequestedPriority", priorities.LeastRequestedPriorityMap, nil, 1),

		// Prioritizes nodes to help achieve balanced resource usage
		factory.RegisterPriorityFunction2("BalancedResourceAllocation", priorities.BalancedResourceAllocationMap, nil, 1),

		// Set this weight large enough to override all other priority functions.
		// TODO: Figure out a better way to do this, maybe at same time as fixing #24720.
		factory.RegisterPriorityFunction2("NodePreferAvoidPodsPriority", priorities.CalculateNodePreferAvoidPodsPriorityMap, nil, 10000),

		// Prioritizes nodes that have labels matching NodeAffinity
		factory.RegisterPriorityFunction2("NodeAffinityPriority", priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1),

		// Prioritizes nodes that marked with taint which pod can tolerate.
		factory.RegisterPriorityFunction2("TaintTolerationPriority", priorities.ComputeTaintTolerationPriorityMap, priorities.ComputeTaintTolerationPriorityReduce, 1),

		// ImageLocalityPriority prioritizes nodes that have images requested by the pod present.
		factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1),
	)
}

Используемый оптимальный алгоритм в основном виден из структуры кода.

Каждая из различных предпочтительных стратегий выделена в отдельный файл.
После прохождения предпочтения звонитеselectHostспособ получить узел с наибольшим количеством очков. Если несколько узлов имеют одинаковую оценку, используйте метод опроса, чтобы получить окончательный узел.

упреждающее планирование

Когда подходящий узел не найден в ходе обычного процесса планирования (в основном из-за отсутствия подходящего узла в предварительном выборе), будет принято решение о том, нужно ли его выполнять или нет.упреждающее планирование, конкретный код находится вpkg/scheduler/scheduler.goПод файлом используется методpreempt, подробности следующим образом:

// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible.
// If it succeeds, it adds the name of the node where preemption has happened to the pod annotations.
// It returns the node name and an error if any.
// ---
// preempt尽可能的通过去抢占低优先级的Pod的空间,为调度失败的Pod创造空间
// 如果成功了,就会去添加在Pod注解中声明的Node名称
// 返回Node名称和错误(如果有错误的话)
func (sched *Scheduler) preempt(preemptor *v1.Pod, scheduleErr error) (string, error) {

	// 1.判断是否开启Pod优先级,调度器是否配置了DisablePreemption,两者中任一满足即停止抢占
	if !util.PodPriorityEnabled() || sched.config.DisablePreemption {
		klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
			" No preemption is performed.")
		return "", nil
	}
	// 2.获取待抢占Pod的信息
	preemptor, err := sched.config.PodPreemptor.GetUpdatedPod(preemptor)
	if err != nil {
		klog.Errorf("Error getting the updated preemptor pod object: %v", err)
		return "", err
	}

	// 3.根据配置的算法获取抢占的节点
	// 获取到的四个参数
	// 1.抢占获取到的Node
	// 2.需要被删除掉的低优先级的Pod列表
	// 3.需要删除掉的nominatedPods列表
	// 4.错误信息
	node, victims, nominatedPodsToClear, err := sched.config.Algorithm.Preempt(preemptor, sched.config.NodeLister, scheduleErr)
	metrics.PreemptionVictims.Set(float64(len(victims)))
	if err != nil {
		klog.Errorf("Error preempting victims to make room for %v/%v.", preemptor.Namespace, preemptor.Name)
		return "", err
	}
	var nodeName = ""
	if node != nil {
		// 1.将Pod和Node结合,更新相应的信息(Pod的nodeName有值),并且构造apiserver的调用
		// 2.所有的将要被删除的Pod一一被删除
		// 只有两者都满足了,才能保证抢占成功
		nodeName = node.Name
		// Update the scheduling queue with the nominated pod information. Without
		// this, there would be a race condition between the next scheduling cycle
		// and the time the scheduler receives a Pod Update for the nominated pod.
		sched.config.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)

		// Make a call to update nominated node name of the pod on the API server.
		err = sched.config.PodPreemptor.SetNominatedNodeName(preemptor, nodeName)
		if err != nil {
			klog.Errorf("Error in preemption process. Cannot update pod %v/%v annotations: %v", preemptor.Namespace, preemptor.Name, err)
			sched.config.SchedulingQueue.DeleteNominatedPodIfExists(preemptor)
			return "", err
		}

		for _, victim := range victims {
			if err := sched.config.PodPreemptor.DeletePod(victim); err != nil {
				klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err)
				return "", err
			}
			sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted", "by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
		}
	}
	// Clearing nominated pods should happen outside of "if node != nil". Node could
	// be nil when a pod with nominated node name is eligible to preempt again,
	// but preemption logic does not find any node for it. In that case Preempt()
	// function of generic_scheduler.go returns the pod itself for removal of the annotation.
	// 4.删除nominatedPods,不要求一定成功,对整体结果不影响
	for _, p := range nominatedPodsToClear {
		rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
		if rErr != nil {
			klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
			// We do not return as this error is not critical.
		}
	}
	return nodeName, err
}

Общая структура кода относительно ясна и состоит из следующих шагов:
1. Чтобы решить, требуется ли упреждающее планирование, есть в основном два элемента оценки (включен ли PodPriority и настроен ли планировщик с DisablePreemption), оба из которых необходимы;
2. Получите информацию о конфигурации Pod, которую необходимо вытеснить и запланировать;
3. Получить результат планирования вытеснения путем настройки стратегии вытеснения алгоритма (самый основной шаг);
4. Завершение работы (обновить информацию о подах, удалить низкоприоритетные поды, удалить некоторые ресурсы, такие как назначенные поды)
Ядром всего процесса является интерфейс для алгоритма планирования для получения результата планирования.generic_scheduler.goфайл, методPreempt.
PreemptМетод возвращает четыре параметра, которые
1) Узел, полученный с помощью Preempt;
2) Список вытесняемых подов (подлежит удалению);
3) NominatedPods, подлежащие очистке (подлежит очистке);
4) Сообщения об ошибках, которые могут быть возвращены
PreemptМетод в основном выполняет следующие шаги:
1. Получите узлы, которые можно использовать для упреждающего планирования, из узлов, не прошедших предварительный выбор, и передайтеswitchОператор исключает узлы, которые нельзя использовать для упреждающего планирования.

Как показано на рисунке, до тех пор, пока причиной сбоя предварительного выбора является вышеупомянутая причина ошибки, узел планирования вытеснения больше нельзя использовать;
2. Получите список PDB (бюджет прерываний Pod) для последующих критериев оценки;
3. ПозвонивselectNodesForPreemptionметод для определения того, какие узлы могут выполнять упреждающее планирование. пройти черезParallelizeUntilМетод синхронно оценивает все узлы и оценивает путь какcheckNode-->selectVictimsOnNode-->podFitsOnNode, который в конечном итоге похож на метод предварительного отбора, использующийpodFitsOnNodeметод. В отличие от обычного предварительного выбора, упреждающее планирование сначала оценивает приоритет пода, а затем вызывает его после удаления пода с более низким приоритетом.podFitsOnNodeметод, чтобы добиться эффекта упреждения.selectNodesForPreemptionПараметр, возвращаемый методом, представляет собой значение типа карты, ключ — информацию об узле, а значение — некоторую информацию, которая будет очищена, если узел используется в качестве узла планирования, включая информацию о модулях и PDB.

4. После получения ресурсов узлов, которые могут быть получены за счет упреждающего планирования, продолжайте фильтрацию по расширенному алгоритму;
5. Выберите последний упреждающий запланированный узел и вызовитеpickOneNodeForPreemptionМетод в основном основан на 5 принципах:
а) узел с наименьшим значением нарушений PDB;
б) выбрать узел с наименьшим приоритетом жертвы, т. е. поды на очищенном узле, чей приоритет является самым низким;
в) дифференцированные по сумме приоритетов всех подов-жертв (поды с низким приоритетом подлежат удалению);
г) Если сумма приоритетов нескольких узлов по-прежнему равна, выберите узел с наименьшим количеством жертв;
e) Если сумма приоритетов нескольких узлов по-прежнему равна, выбирается первый такой узел (случайно сортируется);
6. После того, как последний узел выбран, запишите NominatedPods с более низким приоритетом на узле.Эти модули еще не были запланированы, и их отношения планирования необходимо удалить и повторно применить. код показывает, как показано ниже:

// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns 1) the node, 2) the list of preempted pods if such a node is found,
// 3) A list of pods whose nominated node name should be cleared, and 4) any
// possible error.
// Preempt does not update its snapshot. It uses the same snapshot used in the
// scheduling cycle. This is to avoid a scenario where preempt finds feasible
// nodes without preempting any pod. When there are many pending pods in the
// scheduling queue a nominated pod will go back to the queue and behind
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
	// Scheduler may return various types of errors. Consider preemption only if
	// the error is of type FitError.
	fitError, ok := scheduleErr.(*FitError)
	if !ok || fitError == nil {
		return nil, nil, nil, nil
	}
	if !podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) {
		klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
		return nil, nil, nil, nil
	}
	allNodes, err := nodeLister.List()
	if err != nil {
		return nil, nil, nil, err
	}
	if len(allNodes) == 0 {
		return nil, nil, nil, ErrNoNodesAvailable
	}
	// 1.获取预选调度失败的节点,但是可能是潜在的抢占可能成功的节点(所有的抢占节点都是在潜在节点内部选择)
	potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError.FailedPredicates)
	if len(potentialNodes) == 0 {
		klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
		// In this case, we should clean-up any existing nominated node name of the pod.
		return nil, nil, []*v1.Pod{pod}, nil
	}
	// 2.获取PDB(Pod中断预算)列表
	pdbs, err := g.pdbLister.List(labels.Everything())
	if err != nil {
		return nil, nil, nil, err
	}
	// 3.获取所有可以进行Preempt的Node节点的信息,主要包含该节点哪些Pod需要被抢占掉
	nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates,
		g.predicateMetaProducer, g.schedulingQueue, pdbs)
	if err != nil {
		return nil, nil, nil, err
	}

	// We will only check nodeToVictims with extenders that support preemption.
	// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
	// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
	// 4.扩展的Preempt调度判断
	nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
	if err != nil {
		return nil, nil, nil, err
	}

	// 5.选中某一个Node
	candidateNode := pickOneNodeForPreemption(nodeToVictims)
	if candidateNode == nil {
		return nil, nil, nil, err
	}

	// Lower priority pods nominated to run on this node, may no longer fit on
	// this node. So, we should remove their nomination. Removing their
	// nomination updates these pods and moves them to the active queue. It
	// lets scheduler find another place for them.
	// 6.判断哪些Pod优先级较低,后续需要被清除掉,不作为NominatedPods存在
	nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
	if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
		return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
	}

	return nil, nil, nil, fmt.Errorf(
		"preemption failed: the target node %s has been deleted from scheduler cache",
		candidateNode.Name)
}

Подводя итог, можно сказать, что основным моментом планирования вытеснения являетсяприоритет. В отличие от обычного планирования, упреждающее планирование делает четкое различие приоритетов для модулей, чтобы достичь цели упреждения.

выборы

При запуске планировщика необходимо определить, требуется ли основная операция выбора. Настройка операции выборов очень проста, просто добавьте в файл конфигурации--leader-elect=trueВот и все. В коде, если обнаружен выбор конфигурации, он сначала будет участвовать в выборе, и только планировщик главного узла может выполнять работу, связанную с планированием.
Структура избирательного кода относительно проста, как показано на рисунке, код расположен вclient-goпакет, путьclient-go/tools/leaderelection/leaderelection.go

Есть три основные функцииle.acquire(ctx),le.renew(ctx)а такжеle.config.Callbacks.OnStartedLeading(ctx).
acquireУказывает, успешны ли основные выборы, только после успеха они могут быть выполненыOnStartedLeadingиrenew.OnStartedLeadingЭто метод обратного вызова, который выполняет действия планировщика.runметод.
renewВ основном выполните операцию обновления основного выбора. Когда планировщик на узле выбран в качестве главного, также необходимо постоянно обновлять информацию, чтобы определить, нормально ли функционирует главный узел.
Входитьacquireилиrenewметод, есть общий метод вызоваtryAcquireOrRenew, этот метод является основной реализацией всех выборов.
tryAcquireOrRenewКак следует из названия, если аренда не получена, он получит аренду лидера, в противном случае он продлит аренду. Выделяют три основные операции:
1. звонокGetAction Получает информацию о том, существует ли ElectionRecord. Если его нет, звонитеCreateМетод создает новую конечную точку, текущий узел является главным узлом планировщика и выбор успешен, в противном случае выполняется операция обновления;
2. Получена запись, свидетельствующая о том, что операция продления аренды выполнена.Необходимо проверить личность и время текущего узла, чтобы определить, можно ли выполнить операцию продления аренды;
3. Обновить информацию, выполнитьUpdateОперация по обновлению выбранной основной информации.

// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
// else it tries to renew the lease if it has already been acquired. Returns true
// on success else returns false.
// ---
// tryAcquireOrRenew,如果没有获取到租约,就去获取leader的租约,否则去更新租约。
func (le *LeaderElector) tryAcquireOrRenew() bool {
	now := metav1.Now()
	leaderElectionRecord := rl.LeaderElectionRecord{
		HolderIdentity:       le.config.Lock.Identity(),
		LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
		RenewTime:            now,
		AcquireTime:          now,
	}

	// 1. obtain or create the ElectionRecord
	// 1. 调用Endpoint的Get操作,获取oldLeaderElectionRecord
	oldLeaderElectionRecord, err := le.config.Lock.Get()
	if err != nil {
		if !errors.IsNotFound(err) {
			klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
			return false
		}
		// 创建新的Endpoint
		if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
			klog.Errorf("error initially creating leader election record: %v", err)
			return false
		}
		le.observedRecord = leaderElectionRecord
		le.observedTime = le.clock.Now()
		return true
	}

	// 2. Record obtained, check the Identity & Time
	// 2. 获取到了记录,检查下身份和时间信息,判断是否合法
	if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
		le.observedRecord = *oldLeaderElectionRecord
		le.observedTime = le.clock.Now()
	}
	if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
		!le.IsLeader() {
		klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
		return false
	}

	// 3. We're going to try to update. The leaderElectionRecord is set to it's default
	// here. Let's correct it before updating.
	if le.IsLeader() {
		leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
	} else {
		leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
	}

	// update the lock itself
	if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
		klog.Errorf("Failed to update lock: %v", err)
		return false
	}
	le.observedRecord = leaderElectionRecord
	le.observedTime = le.clock.Now()
	return true
}

Scheduler的选举操作比较简单,主要就是通过判断记录在Etcd中的Endpoints是否可以更新来判断是否可以进行选举。整个选举操作依赖于Etcd的特点来保证分布式操作的成功和唯一。 существуетkube-systemСоответствующую конечную точку можно просмотреть в пространстве имен:kube-scheduler.