«Иллюстрированный принцип работы и процесс контроллера развертывания Kubernetes» был впервые опубликован в:блог Гондурас What/15763918313…
На основе чтения исходного кода Kubernetes v1.16 в этой статье описывается рабочий процесс контроллера развертывания с помощью блок-схем, но также включены соответствующие коды для справки.
Предыдущая статья《Cubernetes Controller Manager работает«Объясняет, как диспетчер контроллера управляет контроллером. Мы знаем, что контроллеру нужно только реализовать соответствующий обработчик событий, и ему больше не нужно заботиться о логике верхнего уровня. Эта статья будет основана на этой статье и описывает, что делает Deployment Controller после получения события от Informer.
Шаблоны развертывания и контроллера
В K8s pod является наименьшей ресурсной единицей, а управление репликами pod осуществляется через ReplicaSet (RS), в то время как развертывание фактически основано на RS для выполнения работы более высокого уровня.
Это режим контроллера Kubernetes. Ресурсы верхнего уровня могут расширять новые возможности, контролируя ресурсы более низкого уровня. Развертывание не управляет модулями напрямую, но обеспечивает контроль над копированием модулей путем управления rs. Развертывание реализует управление версиями, контролируя rs: каждый выпуск соответствует версии, каждая версия имеет rs, а номер версии указан в аннотации, и rs каждый раз запускает соответствующий модуль в соответствии с шаблоном модуля и количеством копий. Развертывание должно только гарантировать, что состояние rs ожидается во всех случаях, а rs гарантирует, что состояние модулей ожидается во всех случаях.
Как K8s управляет развертыванием
Разобравшись с расположением ресурса развертывания в K8s, давайте посмотрим, как этот ресурс достигает ожидаемого состояния.
API и контроллеры Kubernetes основаны на горизонтальном триггере, который может способствовать саморемовестному и циклическому координации системы.
Горизонтальный запуск этой концепции от аппаратного прерывания, прерывание может запускаться по горизонтали или по фронту:
- Горизонтальный триггер: система зависит только от текущего состояния. Даже если система пропустит событие (возможно, из-за сбоя), после восстановления она все равно сможет отреагировать соответствующим образом, просматривая текущее состояние сигнала.
- Запуск по фронту: система зависит не только от текущего состояния, но и от прошлого состояния. Если система пропускает какое-либо событие («край»), событие необходимо повторно посетить, чтобы восстановить систему.
Реализация API горизонтального триггера Kubernetes заключается в том, что контроллер отслеживает фактическое состояние объекта ресурса, сравнивает его с желаемым состоянием объекта, а затем корректирует фактическое состояние, чтобы оно соответствовало желаемому состоянию.
API с горизонтальным запуском также называется декларативным API, а контроллер, который отслеживает объект ресурса развертывания и определяет, что он соответствует ожиданиям, является контроллером развертывания, а соответствующий контроллер rs — контроллером rs.
Deployment Controller
Архитектура
Первый взглядDeploymentController
Определение в K8s:
type DeploymentController struct {
rsControl controller.RSControlInterface
client clientset.Interface
eventRecorder record.EventRecorder
syncHandler func(dKey string) error
enqueueDeployment func(deployment *apps.Deployment)
dLister appslisters.DeploymentLister
rsLister appslisters.ReplicaSetLister
podLister corelisters.PodLister
dListerSynced cache.InformerSynced
rsListerSynced cache.InformerSynced
podListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
}
В основном он включает в себя несколько больших частей контента:
-
rsControl
ЯвляетсяReplicaSet Controller
инструменты для принятия и отказа от rs; -
client
Это клиент, который взаимодействует с APIServer; -
eventRecorder
используется для записи событий; -
syncHandler
Используется для обработки синхронизации развертывания; -
enqueueDeployment
способ поставить развертывание в очередь; -
dLister
,rsLister
,podLister
используется отдельно отshared informer store
Способ получения ресурсов; -
dListerSynced
,rsListerSynced
,podListerSynced
используются для идентификацииshared informer store
Был ли он синхронизирован; -
queue
Это рабочая очередь. При изменении развертывания, набора реплик и пода соответствующее развертывание будет помещено в эту очередь.syncHandler()
Метод единообразно обрабатывает развертывание из рабочей очереди.
процесс работы
Давайте посмотрим на рабочий процесс контроллера развертывания.
Контроллер развертывания использует рабочие возможности информера для мониторинга ресурсов и совместной работы с другими контроллерами. В основном используются три общих информера — информер развертывания, информер rs и информер pod.
Во-первых, контроллер развертывания зарегистрирует функции-ловушки с тремя общими информаторами, и три функции-ловушки поместят соответствующее развертывание в рабочую очередь при поступлении соответствующего события.
Когда контроллер развертывания запустится, будет рабочий процесс, который будет управлять функцией syncHandler, выталкивать элементы в рабочую очередь в режиме реального времени и выполнять задачи в соответствии с элементами. В основном это включает в себя: принятие и отказ от rs, распределение событий в eventRecorder и принятие решения о том, как поступать с подчиненными ресурсами в соответствии со стратегией обновления.
workqueue
Сначала посмотрите на рабочую очередь, которая на самом деле является очередью, используемой для помощи информеру в распределении событий. Весь процесс можно представить в виде следующего рисунка.
Как видите, рабочая очередь разделена на три части: одна — очередь «первым поступил — первым обслужен», реализованная слайсами; две другие — карты с именами «грязные» и «обработка». Весь рабочий процесс разделен на три действия: добавить, получить и выполнить.
add — поместить сообщение в очередь. Сообщение приходит от информера.На самом деле входящее сообщение - это не все события, а ключ ресурса, то есть namespace/name.Таким образом бизнес-логика оповещается о том, что наступило событие смены ресурса, и вам нужно взять этот ключ, чтобы получить его из конкретных ресурсов индексатора. Как показано в зеленом процессе на рисунке выше, после обработки сообщения сначала проверьте, существует ли грязное сообщение.Если оно существует, не выполняйте никакой обработки, указывая на то, что событие уже находится в очереди и не нуждается в обработке. повторно; если его нет в грязном, то ключ хранится в грязном (используется как ключ карты, значение представляет собой пустую структуру), а затем копия проталкивается в очередь.
get — это процесс, с помощью которого функция дескриптора получает ключ из очереди. При извлечении ключа из очереди он будет помещен в обработку, а его грязный индекс будет удален. Причина этого шага — поместить элемент в обработку, чтобы отметить, что он обрабатывается, в то время как удаление его из грязного не влияет на постановку в очередь последующих событий.
done — это шаг, который handle должен выполнить после обработки ключа, что эквивалентно отправке ack в workqueue, указывающего, что я закончил его обработку, и это действие просто удаляет его из обработки.
С помощью этой небольшой и красивой очереди «первым поступил — первым обслужен» мы можем избежать повторного получения ресурсов от индексатора, когда происходит несколько событий ресурсов. Разберем конкретный процесс развертывания контроллера.
Претензия ReplicaSet и процесс отказа
В рабочем процессе контроллера развертывания вы можете заметить, что в дополнение к трем функциям ловушек развертывания есть также функции ловушек rs и pod, Среди трех функций ловушек rs это включает в себя принятие и отказ от rs при развертывании .
rs узнать родственников
Во-первых, давайте посмотрим на процесс распознавания rs:
В трех крюковых функциях rs задействован процесс узнавания родственников.
Отслеживая изменение rs, он найдет соответствующее развертывание в очереди по полю ownerReferences rs, если поле пустое, значит, это осиротевший rs, и будет активирован механизм распознавания rs.
Первым шагом в процессе идентификации является просмотр всех развертываний, определение того, соответствует ли селектор развертывания меткам текущего rs, и поиск всех подходящих развертываний.
Затем оцените, сколько всего развертываний, если 0, возвращайтесь сразу, никто не хочет требовать, ничего не делайте, и процесс утверждения завершен; если больше 1, выбрасывайте, потому что это ненормально. поведение и не разрешено Несколько развертываний имеют одинаковый rs одновременно; если есть и только одно развертывание соответствует ему, найдите развертывание, которое готово принять, и поместите его в очередь.
addReplicaSet()
иupdateReplicaSet()
Процесс подачи заявки аналогичен, только показан здесьaddReplicaSet()
код:
func (dc *DeploymentController) addReplicaSet(obj interface{}) {
rs := obj.(*apps.ReplicaSet)
if rs.DeletionTimestamp != nil {
dc.deleteReplicaSet(rs)
return
}
if controllerRef := metav1.GetControllerOf(rs); controllerRef != nil {
d := dc.resolveControllerRef(rs.Namespace, controllerRef)
if d == nil {
return
}
klog.V(4).Infof("ReplicaSet %s added.", rs.Name)
dc.enqueueDeployment(d)
return
}
ds := dc.getDeploymentsForReplicaSet(rs)
if len(ds) == 0 {
return
}
klog.V(4).Infof("Orphan ReplicaSet %s added.", rs.Name)
for _, d := range ds {
dc.enqueueDeployment(d)
}
}
принятие и отказ от развертывания
Процесс принятия и отказа развертывания RS — это процесс обработки элемента из WorkQueue, но также находит все RS текущего развертывания.
Этот процесс будет опрашивать все rs, если есть владелец и это текущее развертывание, затем судить, удовлетворяет ли метка селектору развертывания, и оно будет включено в результат; если нет, запустить механизм отказа и удалить только владелец ссылки на rs, чтобы сделать его сиротой и больше ничего не делать. Это также является причиной того, что после изменения селектора развертывания будет еще один rs с репликами! = 0.
Если rs не имеет владельца и является потерянным, определите, удовлетворяет ли метка селектору развертывания и условиям, затем запустите механизм принятия, установите его ownerReferences на текущее развертывание, а затем перечислите его в результате.
Вот исходный код всего процесса:
func (m *BaseControllerRefManager) ClaimObject(obj metav1.Object, match func(metav1.Object) bool, adopt, release func(metav1.Object) error) (bool, error) {
controllerRef := metav1.GetControllerOf(obj)
if controllerRef != nil {
if controllerRef.UID != m.Controller.GetUID() {
return false, nil
}
if match(obj) {
return true, nil
}
if m.Controller.GetDeletionTimestamp() != nil {
return false, nil
}
if err := release(obj); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return false, nil
}
if m.Controller.GetDeletionTimestamp() != nil || !match(obj) {
return false, nil
}
if obj.GetDeletionTimestamp() != nil {
return false, nil
}
if err := adopt(obj); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}
Функции принятия и отказа:
func (m *ReplicaSetControllerRefManager) AdoptReplicaSet(rs *apps.ReplicaSet) error {
if err := m.CanAdopt(); err != nil {
return fmt.Errorf("can't adopt ReplicaSet %v/%v (%v): %v", rs.Namespace, rs.Name, rs.UID, err)
}
addControllerPatch := fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.Controller.GetName(), m.Controller.GetUID(), rs.UID)
return m.rsControl.PatchReplicaSet(rs.Namespace, rs.Name, []byte(addControllerPatch))
}
func (m *ReplicaSetControllerRefManager) ReleaseReplicaSet(replicaSet *apps.ReplicaSet) error {
klog.V(2).Infof("patching ReplicaSet %s_%s to remove its controllerRef to %s/%s:%s",
replicaSet.Namespace, replicaSet.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.Controller.GetUID(), replicaSet.UID)
err := m.rsControl.PatchReplicaSet(replicaSet.Namespace, replicaSet.Name, []byte(deleteOwnerRefPatch))
if err != nil {
if errors.IsNotFound(err) {
return nil
}
if errors.IsInvalid(err) {
return nil
}
}
return err
}
rolloutRecreate
Если стратегия обновления развертывания — воссоздать, процесс заключается в удалении старого модуля и запуске нового. Конкретный процесс выглядит следующим образом:
Во-первых, в соответствии с процессом принятия и отказа от rs на предыдущем шаге получите все rs текущего развертывания, отсортируйте и найдите последний rs и сравните его шаблон pod с шаблоном pod развертывания. , вам нужно создать новый rs;
Процесс создания нового rs заключается в вычислении хеш-значения шаблона пода текущего развертывания и добавлении его к метке и селектору rs;
Рассчитаем максимальную ревизию для всех старых rs, прибавим к ней единицу, как ревизию новой rs, и зададим для новой rs следующие аннотации:
"deployment.kubernetes.io/revision"
"deployment.kubernetes.io/desired-replicas"
"deployment.kubernetes.io/max-replicas"
Если ревизия текущего развертывания не самая последняя, сделайте ее самой последней, если статус нужно обновить, обновите его состояние;
Понизьте версию старого rs, то есть установите для его счетчика реплик значение 0;
Чтобы определить, все ли старые модули остановлены, условие оценки состоит в том, что состояние модуля не выполнено или выполнено успешно, неизвестно или все другие состояния не остановлены; если не все модули остановлены, выйдите из этой операции и обработайте ее в следующем цикле;
Если все поды остановлены, обновите новый rs, то есть установите количество реплик равным количеству реплик развертывания;
Наконец, очистите работу, например, удалите лишние rs, когда слишком много старых rs.
Вот исходный код для rolloutRecreate:
func (dc *DeploymentController) rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID][]*v1.Pod) error {
newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false)
if err != nil {
return err
}
allRSs := append(oldRSs, newRS)
activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d)
if err != nil {
return err
}
if scaledDown {
return dc.syncRolloutStatus(allRSs, newRS, d)
}
if oldPodsRunning(newRS, oldRSs, podMap) {
return dc.syncRolloutStatus(allRSs, newRS, d)
}
if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true)
if err != nil {
return err
}
allRSs = append(oldRSs, newRS)
}
if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
return err
}
if util.DeploymentComplete(d, &d.Status) {
if err := dc.cleanupDeployment(oldRSs, d); err != nil {
return err
}
}
return dc.syncRolloutStatus(allRSs, newRS, d)
}
rolloutRolling
Если стратегия обновления развертывания — воссоздать, процесс заключается в удалении старого модуля и запуске нового. Конкретный процесс выглядит следующим образом:
Как видно из рисунка выше, процесс от запуска до создания нового rs такой же, как и процесс rolloutRecreate, с той лишь разницей, что процесс установки количества новых реплик rs. В процессе прокатки количество копий нового rs составляетdeploy.replicas + maxSurge - currentPodCount
. код показывает, как показано ниже:
func NewRSNewReplicas(deployment *apps.Deployment, allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet) (int32, error) {
switch deployment.Spec.Strategy.Type {
case apps.RollingUpdateDeploymentStrategyType:
// Check if we can scale up.
maxSurge, err := intstrutil.GetValueFromIntOrPercent(deployment.Spec.Strategy.RollingUpdate.MaxSurge, int(*(deployment.Spec.Replicas)), true)
if err != nil {
return 0, err
}
// Find the total number of pods
currentPodCount := GetReplicaCountForReplicaSets(allRSs)
maxTotalPods := *(deployment.Spec.Replicas) + int32(maxSurge)
if currentPodCount >= maxTotalPods {
// Cannot scale up.
return *(newRS.Spec.Replicas), nil
}
// Scale up.
scaleUpCount := maxTotalPods - currentPodCount
// Do not exceed the number of desired replicas.
scaleUpCount = int32(integer.IntMin(int(scaleUpCount), int(*(deployment.Spec.Replicas)-*(newRS.Spec.Replicas))))
return *(newRS.Spec.Replicas) + scaleUpCount, nil
case apps.RecreateDeploymentStrategyType:
return *(deployment.Spec.Replicas), nil
default:
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
}
}
Затем идет процесс увеличения или уменьшения количества старых и новых реплик rs. Главное — сначала масштабировать новый rs, а потом уменьшать старый rs. Процесс увеличения масштаба нового rs такой же, как и выше; процесс уменьшения масштаба старого rs заключается в том, чтобы сначала рассчитать максимальное количество реплик уменьшения масштаба, если оно меньше 0, ничего не делать; затем выполнить оптимизацию при уменьшении масштаба , сначала уменьшите масштаб Для обычного rs гарантированно сначала будут удалены неработоспособные реплики; наконец, если все еще есть баланс, уменьшите масштаб обычного rs.
Количество копий для каждого уменьшения составляетallAvailablePodCount - minAvailable
,СейчасallAvailablePodCount - (deploy.replicas - maxUnavailable)
. код показывает, как показано ниже:
func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs []*apps.ReplicaSet, oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) (int32, error) {
maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
// Check if we can scale down.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
// Find the number of available pods.
availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
if availablePodCount <= minAvailable {
// Cannot scale down.
return 0, nil
}
klog.V(4).Infof("Found %d available pods in deployment %s, scaling down old RSes", availablePodCount, deployment.Name)
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
totalScaledDown := int32(0)
totalScaleDownCount := availablePodCount - minAvailable
for _, targetRS := range oldRSs {
if totalScaledDown >= totalScaleDownCount {
// No further scaling required.
break
}
if *(targetRS.Spec.Replicas) == 0 {
// cannot scale down this ReplicaSet.
continue
}
// Scale down.
scaleDownCount := int32(integer.IntMin(int(*(targetRS.Spec.Replicas)), int(totalScaleDownCount-totalScaledDown)))
newReplicasCount := *(targetRS.Spec.Replicas) - scaleDownCount
if newReplicasCount > *(targetRS.Spec.Replicas) {
return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
}
_, _, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment)
if err != nil {
return totalScaledDown, err
}
totalScaledDown += scaleDownCount
}
return totalScaledDown, nil
}
Суммировать
Поскольку Kubernetes использует декларативный API, контроллер вычисляет ожидаемое состояние на основе текущего события и оценивает, соответствует ли текущее фактическое состояние ожидаемому состоянию, и, если нет, предпринимает действия, чтобы приблизить его.
В этой статье анализируется рабочий процесс контроллера развертывания.Хотя он довольно громоздкий, то, что он делает, сосредоточен на цели «приблизиться к ожиданиям».
Я надеюсь, что эти две статьи помогут вам получить представление о пятне леопарда и понять общий рабочий процесс и реализацию диспетчера контроллеров Kubernetes.