Cubernetes Controller Manager работает

задняя часть Kubernetes
Cubernetes Controller Manager работает

Статья «Как работает диспетчер контроллеров Kubernetes» была впервые опубликована по адресу:blog.I и PO.net/15763910382…

Эта статья основана на чтении исходного кода Kubernetes v1.16, В статье есть некий исходный код, но я постараюсь описать его как можно понятнее с помощью прилагаемых картинок.

В главном узле Kubernetes есть три важных компонента: ApiServer, ControllerManager и Scheduler, которые вместе берут на себя управление всем кластером. В этой статье делается попытка разобраться в рабочем процессе и принципах ControllerManager.

Что такое диспетчер контроллеров

Согласно официальной документации: kube-controller-manager запускает контроллеры — фоновые потоки, обрабатывающие рутинные задачи в кластере.

Проще говоря, Controller Manager — это центр управления и контроля в кластере, состоящий из нескольких контроллеров, отвечающих за разные ресурсы, и совместно отвечающий за управление всеми ресурсами, такими как Node и Pod в кластере.Например, когда Pod, созданный Deployment, аварийно завершает работу, RS Controller примет и обработает событие выхода и создаст новый Pod для поддержки ожидаемого количества реплик.

Почти каждый конкретный ресурс имеет определенный контроллер для поддержки и управления для поддержания ожидаемого состояния, а ответственностью диспетчера контроллеров является объединение всех контроллеров:

  1. Обеспечьте инфраструктуру для снижения сложности реализации контроллера.
  2. Запустить и поддерживать нормальную работу контроллера

Можно сказать, что контроллер гарантирует, что ресурсы внутри кластера остаются в ожидаемом состоянии, а диспетчер контроллера гарантирует, что контроллер остается в ожидаемом состоянии.

Рабочий процесс контроллера

Прежде чем объяснить, как менеджер контроллера обеспечивает инфраструктуру и операционную среду для контроллера, давайте сначала понять, как выглядит рабочий процесс контроллера.

С многомерной точки зрения диспетчер контроллеров в основном предоставляет возможность распределять события, а различным контроллерам нужно только зарегистрировать соответствующие обработчики для ожидания получения и обработки событий.

Возьмем, к примеру, контроллер развертывания вpkg/controller/deployment/deployment_controller.goизNewDeploymentControllerЭтот метод включает регистрацию обработчика событий.Для контроллера развертывания требуется только реализовать различную логику обработки в соответствии с различными событиями для управления соответствующими ресурсами.

dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    dc.addDeployment,
	UpdateFunc: dc.updateDeployment,
	// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
	DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    dc.addReplicaSet,
	UpdateFunc: dc.updateReplicaSet,
	DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	DeleteFunc: dc.deletePod,
})

Видно, что с помощью диспетчера контроллеров логика контроллера может быть очень чистой, просто нужно реализовать соответствующий EventHandler, тогда какую конкретную работу выполняет диспетчер контроллеров?

Архитектура диспетчера контроллеров

Вспомогательным диспетчером контроллеров для завершения распределения событий является client-go, но одним из наиболее важных модулей является информер.

Kubernetes предоставляет схему архитектуры client-go на github, видно, что Controller — это именно то, что описывает нижняя часть (CustomController), а Controller Manager в основном дополняет верхнюю часть.

Фабрика Информеров

Как видно из приведенного выше рисунка, Informer — это очень важная роль «моста», поэтому управление Informer — это первое, что должен сделать Controller Manager.

Когда Controller Manager запускается, он создаетSharedInformerFactoryПоскольку каждый информер будет поддерживать длительное соединение с сервером Api, эта одноэлементная фабрика гарантирует, что каждый тип информера создается только один раз, предоставляя уникальную запись для всех контроллеров для получения информера.

Логика инициализации singleton plant:

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

Как видно из приведенной выше логики инициализации,sharedInformerFactoryВажнейшим из них являетсяinformersКарта, где ключ — тип ресурса, а значение — информер, относящийся к типу ресурса. Каждый тип Информера будет создан только один раз и сохранен на карте.Разные Контроллеры будут получать один и тот же экземпляр Информера только тогда, когда им нужен один и тот же Информер ресурсов.

Для диспетчера контроллеров поддержка всех информеров для обеспечения их нормальной работы является основным условием обеспечения нормальной работы всех контроллеров.sharedInformerFactoryВсе экземпляры информера поддерживаются через эту карту, поэтомуsharedInformerFactoryОн также берет на себя ответственность за предоставление единой стартовой записи:

// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}
}

Когда начинается менеджер контроллера, самое главное - через заводStartСпособы запуска всех Информера.

Создание Информера

Давайте посмотрим, как создаются эти Информеры. Менеджер контроллера вcmd/kube-controller-manager/app/controllermanager.goизNewControllerInitializersВсе контроллеры изначально распознаются в функции.Из-за длинного кода здесь мы берем в качестве примера только контроллер развертывания.

Логика инициализации контроллера развертывания находится вcmd/kube-controller-manager/app/apps.goизstartDeploymentControllerв функции:

func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
	if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
		return nil, false, nil
	}
	dc, err := deployment.NewDeploymentController(
		ctx.InformerFactory.Apps().V1().Deployments(),
		ctx.InformerFactory.Apps().V1().ReplicaSets(),
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.ClientBuilder.ClientOrDie("deployment-controller"),
	)
	if err != nil {
		return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
	}
	go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
	return nil, true, nil
}

Наиболее критична логикаdeployment.NewDeploymentControllerВыше функция фактически создает контроллер развертывания, а первые три параметра функции создания — это информер развертывания, набор реплик и под. Как видите, singleton factory Информера предоставляет запись о создании Информера для разных ресурсов с ApiGroup в качестве пути.

Обратите внимание, однако, что.Apps().V1().Deployments()Хотя вернувшийсяdeploymentInformerэкземпляр типа, однако,deploymentInformerНа самом деле, это не настоящий Информер (хотя он и назван в честь Информера), это просто класс-шаблон, основной функцией которого является предоставление шаблона для создания конкретного Информера ресурса, ориентированного на Развертывание:

// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
	return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

Настоящая логика создания Информера находится вdeploymentInformer.Informer()середина(client-go/informers/apps/v1/deployment.go),f.defaultInformerDeployment Informer по умолчанию создает метод шаблона, передавая экземпляр ресурса и метод шаблона в фабрику Informer.InformerForметод для создания Informer, который фокусируется только на ресурсах развертывания:

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

Кратко резюмируем:

  1. Определенный тип класса шаблона Informer можно получить через фабрику Informer (т.е. здесьdeploymentInformer)
  2. Что на самом деле создает этот конкретный ресурс Informer, так это класс шаблона Informer.Informer()метод
  3. иInformer()Методы просто пропускаются через информер заводInformerForСоздать истинный информер

Здесь используется шаблонный метод (паттерн проектирования).Хотя здесь есть некоторая хитрость, но вы можете обратиться к рисунку ниже, чтобы разобраться.Ключ к пониманию кроется в ИнформереЛогика дифференцированного создания делегирована классам шаблонов:

Наконец, назвалsharedIndexInformerСтруктура будет создана и фактически возьмет на себя обязанности Informer. Этот экземпляр также зарегистрирован на заводской карте Informer.

Работа Информера

Потому что настоящий информер экземпляр являетсяsharedIndexInformerОбъект типа, при запуске фабрики Информера (выполняетсяStartметод), на самом деле выполняетсяsharedIndexInformer.

sharedIndexInformerявляется компонентом client-go, егоRunХотя метод состоит всего из нескольких десятков строк, он проделал большую работу. А вот и самая интересная часть Controller Manager.

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	s.controller.Run(stopCh)
}

sharedIndexInformerЛогика запуска в основном выполняет следующие действия:

  1. создал имяfifoочередь
  2. Создал и запустилcontrollerэкземпляр
  3. началосьcacheMutationDetector
  4. началосьprocessor

Эти термины (или компоненты) не были упомянуты ранее, и эти четыре вещи являются основным содержанием работы диспетчера контроллера, поэтому я введем их отдельно ниже.

sharedIndexInformer

sharedIndexInformerInformer - это общий кадр, другой контроллер нужно предоставить только класс шаблона (например, упомянутый вышеdeploymentInformer), вы можете создать специальный Информер, который соответствует вашим потребностям.

sharedIndexInformerСодержит кучу инструментов для выполнения задачи Информера, его основной код находится вclient-go/tools/cache/shared_informer.goсередина. Логика его создания тоже в нем:

// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      objType,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

В логике создания есть несколько вещей, о которых нужно знать:

  1. процессор: Обеспечивает функцию регистрации EventHandler и распределения событий
  2. Индексатор: обеспечивает функцию кэширования ресурсов
  3. listerWatcher: предоставляется классом шаблона, который содержит определенный ресурс и метод Watch List.
  4. objectType: используется для обозначения конкретного типа ресурса.
  5. cacheMutationDetector: Мониторинг кеша Informer

Кроме того, он также включает в себя логику запуска, упомянутую выше.DeltaFIFOочередь иcontroller, которые представлены отдельно ниже.

sharedProcessor

Процессор — очень интересный компонент в sharedIndexInformer.Диспетчер контроллеров использует фабрику singleton Informer, чтобы гарантировать, что разные контроллеры используют один и тот же информер, но разные контроллеры имеют разные обработчики, зарегистрированные для общего информера, так как же информер должен управлять зарегистрированным информером? Что с Хэндлером?

Процессор — это компонент, используемый для управления зарегистрированными обработчиками и распределения событий по разным обработчикам.

type sharedProcessor struct {
	listenersStarted bool
	listenersLock    sync.RWMutex
	listeners        []*processorListener
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

Суть работы sharedProcessor вращается вокругlistenersЭтот фрагмент прослушивателя расширяется.

Когда мы регистрируем Handler в Informer, он в конечном итоге будет преобразован вprocessorListenerПримеры структур:

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
	ret := &processorListener{
		nextCh:                make(chan interface{}),
		addCh:                 make(chan interface{}),
		handler:               handler,
		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
		requestedResyncPeriod: requestedResyncPeriod,
		resyncPeriod:          resyncPeriod,
	}

	ret.determineNextResync(now)

	return ret
}

Этот экземпляр в основном содержит два канала и методы Handler, зарегистрированные снаружи. И здесь создан экземплярprocessorListenerВ конечном итоге объект будет добавлен вsharedProcessor.listenersСписок:

func (p *sharedProcessor) addListener(listener *processorListener) {
	p.listenersLock.Lock()
	defer p.listenersLock.Unlock()

	p.addListenerLocked(listener)
	if p.listenersStarted {
		p.wg.Start(listener.run)
		p.wg.Start(listener.pop)
	}
}

Как показано, метод Handler в контроллере в конечном итоге будет добавлен к прослушивателю, а прослушиватель будет присоединен кsharedProcessorв срезе Listeners.

Как упоминалось ранее,sharedIndexInformerначнетsharedProcessorбежать, иsharedProcessorЛогика запуска аналогична этимlistenerСвязанный:

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
	func() {
		p.listenersLock.RLock()
		defer p.listenersLock.RUnlock()
		for _, listener := range p.listeners {
			p.wg.Start(listener.run)
			p.wg.Start(listener.pop)
		}
		p.listenersStarted = true
	}()
	<-stopCh
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	for _, listener := range p.listeners {
		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
	}
	p.wg.Wait() // Wait for all .pop() and .run() to stop
}

можно увидеть,sharedProcessorбудет выполняться последовательно при запускеlistenerизrunиpopметоды, мы теперь рассмотрим эти два метода.

начало слушателя

Поскольку прослушиватель содержит зарегистрированный контроллер метода Handler, поэтому прослушиватель является наиболее важной функцией, когда происходит событие, запускающее эти методы, иlistener.runэто держаться отnextChЭтот канал получает событие и выполняет соответствующий обработчик:

func (p *processorListener) run() {
	// this call blocks until the channel is closed.  When a panic happens during the notification
	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
	// the next notification will be attempted.  This is usually better than the alternative of never
	// delivering again.
	stopCh := make(chan struct{})
	wait.Until(func() {
		// this gives us a few quick retries before a long pause and then a few more quick retries
		err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
			for next := range p.nextCh {
				switch notification := next.(type) {
				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)
				default:
					utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
				}
			}
			// the only way to get here is if the p.nextCh is empty and closed
			return true, nil
		})

		// the only way to get here is if the p.nextCh is empty and closed
		if err == nil {
			close(stopCh)
		}
	}, 1*time.Minute, stopCh)
}

можно увидеть,listener.runнепрерывно отnextChПолучайте события в этом канале, ноnextChОткуда взялись события на этом канале?listener.popнесет ответственность за размещение события вnextChсередина.

listener.popочень тонкая и интересная логика:

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

listenerПричина, по которой включены два канала:addChиnextCh, потому что Информер непредсказуемlistener.handlerЕсли скорость потребления событий больше, чем скорость производства событий, тоpendingNotificationsБуферная очередь для хранения событий, которые будут использоваться в будущем.

popС одной стороны, метод будет постоянно меняться отaddChчтобы получить последние события, чтобы гарантировать, что производитель не заблокируется. Затем судите, есть ли буфер, если он есть, добавьте событие в буфер, если нет, попробуйте запушить его вnextCh.

С другой стороны, он будет судить о том, есть ли еще события в буфере.nextCh.

popМетод реализует механизм распределения с буфером, так что события могут непрерывно отправляться изaddChприбытьnextCh.但是问题来了,那addChОткуда произошло событие.

На самом деле источник очень простой,listenerсуществует одинaddметод, входным параметром является событие, метод поместит новое событие вaddChсередина. и позвонитеaddПуть в том, чтобы управлять всемиlistenerизsharedProcessor.

Как указано выше,sharedProcessorОтветственность состоит в том, чтобы управлять всеми обработчиками и распространять события, а реальная работа по распространениюdistributeметод:

func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()

	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

На данный момент у нас есть немного ясности:

  1. Контроллер регистрирует обработчик в Информере
  2. Информер отsharedProcessorПоддерживается весь обработчик (слушатель)
  3. Когда Информер получает событие, передатьsharedProcessor.distributeраспространять событие
  4. Контроллер запускается соответствующим обработчиком для обработки собственной логики.

Итак, остается вопрос, откуда берутся события Informer?

DeltaFIFO

Прежде чем анализировать событие приобретения Informer, необходимо заранее упомянуть очень интересный небольшой инструмент, а именноsharedIndexInformer.RunСозданныйfifoочередь:

fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

DeltaFIFO — очень интересная очередь, соответствующий код определен вclient-go/tools/cache/delta_fifo.goсередина. Для очереди наиболее важными должны быть метод Add и метод Pop.DeltaFIFO предоставляет несколько методов Add.Хотя разные методы различаются в соответствии с разными типами событий (добавление/обновление/удаление/синхронизация), в конечном итоге они будут выполняться.queueActionLocked:

// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	id, err := f.KeyOf(obj)
	if err != nil {
		return KeyError{obj, err}
	}

	// If object is supposed to be deleted (last event is Deleted),
	// then we should ignore Sync events, because it would result in
	// recreation of this object.
	if actionType == Sync && f.willObjectBeDeletedLocked(id) {
		return nil
	}

	newDeltas := append(f.items[id], Delta{actionType, obj})
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		if _, exists := f.items[id]; !exists {
			f.queue = append(f.queue, id)
		}
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// We need to remove this from our map (extra items in the queue are
		// ignored if they are not in the map).
		delete(f.items, id)
	}
	return nil
}

queueActionLockedПервый параметр actionType метода — это тип события:

const (
	Added   DeltaType = "Added"   // watch api 获得的创建事件
	Updated DeltaType = "Updated" // watch api 获得的更新事件
	Deleted DeltaType = "Deleted" // watch api 获得的删除事件
	Sync DeltaType = "Sync"       // 触发了 List Api,需要刷新缓存
)

Из типа события и метода постановки в очередь видно, что это очередь с бизнес-функциями, а не просто "первым пришел - первым обслужен". В методе постановки в очередь есть две очень тонкие конструкции:

  1. События, входящие в очередь, сначала определит, есть ли непромокаемые события в ресурсе, а затем обрабатывают их соответствующим образом
  2. Если ресурс был удален, если ресурс уже удален, больше не обрабатывается.

Второй момент понять несложно: если сработал запрос List и ресурс, подлежащий обработке, оказался удаленным, то его не нужно повторно ставить в очередь на обработку. Первый пункт необходимо объединить с методом удаления из очереди, чтобы посмотреть:

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			if f.IsClosed() {
				return nil, ErrFIFOClosed
			}

			f.cond.Wait()
		}
		id := f.queue[0]
		f.queue = f.queue[1:]
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		item, ok := f.items[id]
		if !ok {
			// Item may have been deleted subsequently.
			continue
		}
		delete(f.items, id)
		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

DeltaFIFOPopУ метода есть входной параметр, который является функцией обработки.Когда очередь закончится, DeltaFIFO сначала получит ресурс в соответствии с идентификатором ресурсавсе события, а затем передать его функции-обработчику.

Как показано на фиговом рабочем процессе:

В целом, метод постановки в очередь DeltaFIFO сначала определит, находится ли ресурс уже в очереди.items, если оно уже существует, указывая на то, что ресурс не был потреблен (все еще находится в очереди в очереди), а затем напрямую добавляет событие кitems[resource_id]в. Если не найденоitems, это создастitems[resource_id]и добавьте идентификатор ресурса кqueueсередина.

А метод DeltaFIFO вне очереди будетqueueПолучить переднюю часть очереди, затем отitemsЗаберите все события ресурса и, наконец, вызовитеPopметод передан вPopProcessFuncТип функции-обработчика.

Таким образом, характеристика DeltaFIFO заключается в том, что в очередь помещаются события (ресурса), а когда он удаляется из очереди, получаются все события ресурса, которые были поставлены в очередь первыми. Этот дизайн гарантирует, что не будет голодания из-за сумасшедшего производственного события определенного ресурса, из-за которого другие ресурсы не будут обработаны.

controller

DeltaFIFO является очень важным компонентом, что делает его действительно ценным, так этоcontroller.

Хотя использование исходного кода K8Scontrollerслово, но этоcontrollerЭто не контроллер ресурсов, такой как контроллер развертывания. Вместо этого это контроллер событий, который связывает предыдущее и следующее (получает событие от API-сервера и отправляет его на Информер для обработки).

controllerЕсть две обязанности:

  1. Получайте события с сервера Api через List-Watch и отправляйте событие в DeltaFIFO.
  2. будетsharedIndexInformerизHandleDeltasПоп-методы как параметр вызова DeltaFIFO

controllerОпределение очень простое, его сутьReflector:

type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

ReflectorКод громоздкий, но функция относительно проста, что определяется с помощью sharedIndexInformer.listerWatcherВыполните List-Watch и поместите полученные события в DeltaFIFO.

controllerПосле запускаReflectorзапустить, затем выполнитьprocessLoop, через бесконечный цикл события ресурсов, которые необходимо обработать, будут непрерывно считываться из DeltaFIFO и отправляться вsharedIndexInformerизHandleDeltasметод (создатьcontrollerкогда назначен наconfig.Process).

func (c *controller) processLoop() {
	for {
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// This is the safe way to re-enqueue.
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

Если мы еще раз посмотрим на метод HandleDeltas в sharedIndexInformer, мы обнаружим, что весь процесс потребления событий открыт:

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	// from oldest to newest
	for _, d := range obj.(Deltas) {
		switch d.Type {
		case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

Мы знали ранееprocessor.distributeметод отправлять событие всемlistenercontrollerБудет использоватьсяReflectorПолучите событие от ApiServer, поместите его в очередь, а затем передайтеprocessLoopВозьмите все события для обработки ресурса из очереди и, наконец, передайтеsharedIndexInformerизHandleDeltasметод, называемыйprocessor.distribute.

Таким образом, мы можем организовать весь поток событий в виде следующей диаграммы:

Indexer

Выше мы разобрали событие от приема до раздачи, и разобрали всю логику посередине, но в методе HandleDeltas у sharedIndexInformer есть какая-то более заметная логика, то есть все события будут обрабатываться первый.s.indexerОбновляйте и распространяйте.

Как упоминалось ранее, Indexer — это потокобезопасное хранилище, используемое в качестве кеша, чтобы уменьшить нагрузку на ApiServer, когда контроллер ресурсов (Controller) запрашивает ресурсы.

Когда какое-либо событие обновляется, кеш в индексаторе будет сначала обновлен, а затем событие будет передано контроллеру ресурсов. ненужные запросы к APIServer.

Конкретная реализация хранилища индексатора находится в client-go/tools/cache/thread_safe_store.go, а данные хранятся вthreadSafeMapсередина:

type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// indexers maps a name to an IndexFunc
	indexers Indexers
	// indices maps a name to an Index
	indices Indices
}

По сути,threadSafeMapЭто карта с блокировкой чтения-записи. В дополнение к этому также могут быть определены индексы.Реализация индексов очень интересна и выполняется через два поля:

  1. IndexersЭто карта, определяющая несколько функций индексирования, ключ — indexName, а значение — функция индексирования (значение индекса вычислительного ресурса).
  2. IndicesОтношение сопоставления между значением индекса и ключом данных сохраняется,Indices— двухслойная карта, ключ первого слоя — indexName, аIndexersСоответственно, определите, какой метод использовать для вычисления значения индекса.Значение представляет собой карту, сохраняющую связь между "значение индекса-ключ ресурса".

Логика корреляции относительно проста, как показано на рисунке 17:

MutationDetector

sharedIndexInformerизHandleDeltasметод, за исключениемs.indexerПомимо обновленных данных,s.cacheMutationDetectorОбновлены данные.

сказал в началеsharedIndexInformerТакже начинаетcacheMutationDetector, чтобы контролировать кеш индексатора.

Поскольку кэш-индексатор на самом деле является указателем, несколько контроллеров доступа к кэшированным ресурсам индексатора, фактически получают тот же экземпляр ресурса. Если есть контроллер, который не выполняет свой долг и модифицирует свойства ресурса, он неизбежно повлияет на правильность других контроллеров.

Роль MutationDetector заключается в том, чтобы периодически проверять, был ли модифицирован кеш.Когда Informer получает новое событие, MutationDetector сохраняет указатель на ресурс (тот же, что и индексатор) и глубокую копию ресурса. Периодически проверяя, согласуется ли ресурс, на который указывает указатель, с глубокой копией, сохраненной в начале, мы знаем, был ли изменен кэшированный ресурс.

Однако то, включен ли мониторинг, зависит от переменных среды.KUBE_CACHE_MUTATION_DETECTORЗатронуто: если эта переменная среды не задана, создается экземпляр sharedIndexInformer.dummyMutationDetector, ничего не делает после запуска.

еслиKUBE_CACHE_MUTATION_DETECTORверно, созданный экземпляр sharedIndexInformerdefaultCacheMutationDetector, экземпляр будет периодически проверять кеш с интервалом в 1 с. Если обнаружится, что кеш изменен, он вызовет функцию обработчика сбоя, а если функция не определена, вызовет панику.

Суммировать

То, что объясняется в этой статье, следует рассматривать как диспетчер контроллеров в узком смысле, ведь она не включает конкретного диспетчера ресурсов (контроллера), а только объясняет, как диспетчер контроллеров «управляет контроллером».

Видно, что диспетчер контроллера проделал большую работу, чтобы контроллер мог сосредоточиться только на обработке событий, которые его волнуют, и ядром этой работы является информер.Когда вы понимаете, как информер работает с другими компонентов, то Controller Manager является основой для менеджера ресурсов.Стало понятно, что произошло.