Статья «Как работает диспетчер контроллеров Kubernetes» была впервые опубликована по адресу:blog.I и PO.net/15763910382…
Эта статья основана на чтении исходного кода Kubernetes v1.16, В статье есть некий исходный код, но я постараюсь описать его как можно понятнее с помощью прилагаемых картинок.
В главном узле Kubernetes есть три важных компонента: ApiServer, ControllerManager и Scheduler, которые вместе берут на себя управление всем кластером. В этой статье делается попытка разобраться в рабочем процессе и принципах ControllerManager.
Что такое диспетчер контроллеров
Согласно официальной документации: kube-controller-manager запускает контроллеры — фоновые потоки, обрабатывающие рутинные задачи в кластере.
Проще говоря, Controller Manager — это центр управления и контроля в кластере, состоящий из нескольких контроллеров, отвечающих за разные ресурсы, и совместно отвечающий за управление всеми ресурсами, такими как Node и Pod в кластере.Например, когда Pod, созданный Deployment, аварийно завершает работу, RS Controller примет и обработает событие выхода и создаст новый Pod для поддержки ожидаемого количества реплик.
Почти каждый конкретный ресурс имеет определенный контроллер для поддержки и управления для поддержания ожидаемого состояния, а ответственностью диспетчера контроллеров является объединение всех контроллеров:
- Обеспечьте инфраструктуру для снижения сложности реализации контроллера.
- Запустить и поддерживать нормальную работу контроллера
Можно сказать, что контроллер гарантирует, что ресурсы внутри кластера остаются в ожидаемом состоянии, а диспетчер контроллера гарантирует, что контроллер остается в ожидаемом состоянии.
Рабочий процесс контроллера
Прежде чем объяснить, как менеджер контроллера обеспечивает инфраструктуру и операционную среду для контроллера, давайте сначала понять, как выглядит рабочий процесс контроллера.
С многомерной точки зрения диспетчер контроллеров в основном предоставляет возможность распределять события, а различным контроллерам нужно только зарегистрировать соответствующие обработчики для ожидания получения и обработки событий.
Возьмем, к примеру, контроллер развертывания вpkg/controller/deployment/deployment_controller.go
изNewDeploymentController
Этот метод включает регистрацию обработчика событий.Для контроллера развертывания требуется только реализовать различную логику обработки в соответствии с различными событиями для управления соответствующими ресурсами.
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addDeployment,
UpdateFunc: dc.updateDeployment,
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeployment,
})
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dc.addReplicaSet,
UpdateFunc: dc.updateReplicaSet,
DeleteFunc: dc.deleteReplicaSet,
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: dc.deletePod,
})
Видно, что с помощью диспетчера контроллеров логика контроллера может быть очень чистой, просто нужно реализовать соответствующий EventHandler, тогда какую конкретную работу выполняет диспетчер контроллеров?
Архитектура диспетчера контроллеров
Вспомогательным диспетчером контроллеров для завершения распределения событий является client-go, но одним из наиболее важных модулей является информер.
Kubernetes предоставляет схему архитектуры client-go на github, видно, что Controller — это именно то, что описывает нижняя часть (CustomController), а Controller Manager в основном дополняет верхнюю часть.
Фабрика Информеров
Как видно из приведенного выше рисунка, Informer — это очень важная роль «моста», поэтому управление Informer — это первое, что должен сделать Controller Manager.
Когда Controller Manager запускается, он создаетSharedInformerFactory
Поскольку каждый информер будет поддерживать длительное соединение с сервером Api, эта одноэлементная фабрика гарантирует, что каждый тип информера создается только один раз, предоставляя уникальную запись для всех контроллеров для получения информера.
Логика инициализации singleton plant:
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
Как видно из приведенной выше логики инициализации,sharedInformerFactory
Важнейшим из них являетсяinformers
Карта, где ключ — тип ресурса, а значение — информер, относящийся к типу ресурса. Каждый тип Информера будет создан только один раз и сохранен на карте.Разные Контроллеры будут получать один и тот же экземпляр Информера только тогда, когда им нужен один и тот же Информер ресурсов.
Для диспетчера контроллеров поддержка всех информеров для обеспечения их нормальной работы является основным условием обеспечения нормальной работы всех контроллеров.sharedInformerFactory
Все экземпляры информера поддерживаются через эту карту, поэтомуsharedInformerFactory
Он также берет на себя ответственность за предоставление единой стартовой записи:
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
Когда начинается менеджер контроллера, самое главное - через заводStart
Способы запуска всех Информера.
Создание Информера
Давайте посмотрим, как создаются эти Информеры. Менеджер контроллера вcmd/kube-controller-manager/app/controllermanager.go
изNewControllerInitializers
Все контроллеры изначально распознаются в функции.Из-за длинного кода здесь мы берем в качестве примера только контроллер развертывания.
Логика инициализации контроллера развертывания находится вcmd/kube-controller-manager/app/apps.go
изstartDeploymentController
в функции:
func startDeploymentController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}] {
return nil, false, nil
}
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Apps().V1().Deployments(),
ctx.InformerFactory.Apps().V1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
)
if err != nil {
return nil, true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs), ctx.Stop)
return nil, true, nil
}
Наиболее критична логикаdeployment.NewDeploymentController
Выше функция фактически создает контроллер развертывания, а первые три параметра функции создания — это информер развертывания, набор реплик и под. Как видите, singleton factory Информера предоставляет запись о создании Информера для разных ресурсов с ApiGroup в качестве пути.
Обратите внимание, однако, что.Apps().V1().Deployments()
Хотя вернувшийсяdeploymentInformer
экземпляр типа, однако,deploymentInformer
На самом деле, это не настоящий Информер (хотя он и назван в честь Информера), это просто класс-шаблон, основной функцией которого является предоставление шаблона для создания конкретного Информера ресурса, ориентированного на Развертывание:
// Deployments returns a DeploymentInformer.
func (v *version) Deployments() DeploymentInformer {
return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
Настоящая логика создания Информера находится вdeploymentInformer.Informer()
середина(client-go/informers/apps/v1/deployment.go
),f.defaultInformer
Deployment Informer по умолчанию создает метод шаблона, передавая экземпляр ресурса и метод шаблона в фабрику Informer.InformerFor
метод для создания Informer, который фокусируется только на ресурсах развертывания:
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
Кратко резюмируем:
- Определенный тип класса шаблона Informer можно получить через фабрику Informer (т.е. здесь
deploymentInformer
) - Что на самом деле создает этот конкретный ресурс Informer, так это класс шаблона Informer.
Informer()
метод - и
Informer()
Методы просто пропускаются через информер заводInformerFor
Создать истинный информер
Здесь используется шаблонный метод (паттерн проектирования).Хотя здесь есть некоторая хитрость, но вы можете обратиться к рисунку ниже, чтобы разобраться.Ключ к пониманию кроется в ИнформереЛогика дифференцированного создания делегирована классам шаблонов:
Наконец, назвалsharedIndexInformer
Структура будет создана и фактически возьмет на себя обязанности Informer. Этот экземпляр также зарегистрирован на заводской карте Informer.
Работа Информера
Потому что настоящий информер экземпляр являетсяsharedIndexInformer
Объект типа, при запуске фабрики Информера (выполняетсяStart
метод), на самом деле выполняетсяsharedIndexInformer
.
sharedIndexInformer
является компонентом client-go, егоRun
Хотя метод состоит всего из нескольких десятков строк, он проделал большую работу. А вот и самая интересная часть Controller Manager.
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh)
}
sharedIndexInformer
Логика запуска в основном выполняет следующие действия:
- создал имя
fifo
очередь - Создал и запустил
controller
экземпляр - началось
cacheMutationDetector
- началось
processor
Эти термины (или компоненты) не были упомянуты ранее, и эти четыре вещи являются основным содержанием работы диспетчера контроллера, поэтому я введем их отдельно ниже.
sharedIndexInformer
sharedIndexInformer
Informer - это общий кадр, другой контроллер нужно предоставить только класс шаблона (например, упомянутый вышеdeploymentInformer
), вы можете создать специальный Информер, который соответствует вашим потребностям.
sharedIndexInformer
Содержит кучу инструментов для выполнения задачи Информера, его основной код находится вclient-go/tools/cache/shared_informer.go
середина. Логика его создания тоже в нем:
// NewSharedIndexInformer creates a new instance for the listwatcher.
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
В логике создания есть несколько вещей, о которых нужно знать:
- процессор: Обеспечивает функцию регистрации EventHandler и распределения событий
- Индексатор: обеспечивает функцию кэширования ресурсов
- listerWatcher: предоставляется классом шаблона, который содержит определенный ресурс и метод Watch List.
- objectType: используется для обозначения конкретного типа ресурса.
- cacheMutationDetector: Мониторинг кеша Informer
Кроме того, он также включает в себя логику запуска, упомянутую выше.DeltaFIFO
очередь иcontroller
, которые представлены отдельно ниже.
sharedProcessor
Процессор — очень интересный компонент в sharedIndexInformer.Диспетчер контроллеров использует фабрику singleton Informer, чтобы гарантировать, что разные контроллеры используют один и тот же информер, но разные контроллеры имеют разные обработчики, зарегистрированные для общего информера, так как же информер должен управлять зарегистрированным информером? Что с Хэндлером?
Процессор — это компонент, используемый для управления зарегистрированными обработчиками и распределения событий по разным обработчикам.
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
Суть работы sharedProcessor вращается вокругlisteners
Этот фрагмент прослушивателя расширяется.
Когда мы регистрируем Handler в Informer, он в конечном итоге будет преобразован вprocessorListener
Примеры структур:
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
Этот экземпляр в основном содержит два канала и методы Handler, зарегистрированные снаружи. И здесь создан экземплярprocessorListener
В конечном итоге объект будет добавлен вsharedProcessor.listeners
Список:
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
Как показано, метод Handler в контроллере в конечном итоге будет добавлен к прослушивателю, а прослушиватель будет присоединен кsharedProcessor
в срезе Listeners.
Как упоминалось ранее,sharedIndexInformer
начнетsharedProcessor
бежать, иsharedProcessor
Логика запуска аналогична этимlistener
Связанный:
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
можно увидеть,sharedProcessor
будет выполняться последовательно при запускеlistener
изrun
иpop
методы, мы теперь рассмотрим эти два метода.
начало слушателя
Поскольку прослушиватель содержит зарегистрированный контроллер метода Handler, поэтому прослушиватель является наиболее важной функцией, когда происходит событие, запускающее эти методы, иlistener.run
это держаться отnextCh
Этот канал получает событие и выполняет соответствующий обработчик:
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
можно увидеть,listener.run
непрерывно отnextCh
Получайте события в этом канале, ноnextCh
Откуда взялись события на этом канале?listener.pop
несет ответственность за размещение события вnextCh
середина.
listener.pop
очень тонкая и интересная логика:
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
listener
Причина, по которой включены два канала:addCh
иnextCh
, потому что Информер непредсказуемlistener.handler
Если скорость потребления событий больше, чем скорость производства событий, тоpendingNotifications
Буферная очередь для хранения событий, которые будут использоваться в будущем.
pop
С одной стороны, метод будет постоянно меняться отaddCh
чтобы получить последние события, чтобы гарантировать, что производитель не заблокируется. Затем судите, есть ли буфер, если он есть, добавьте событие в буфер, если нет, попробуйте запушить его вnextCh
.
С другой стороны, он будет судить о том, есть ли еще события в буфере.nextCh
.
pop
Метод реализует механизм распределения с буфером, так что события могут непрерывно отправляться изaddCh
прибытьnextCh
.但是问题来了,那addCh
Откуда произошло событие.
На самом деле источник очень простой,listener
существует одинadd
метод, входным параметром является событие, метод поместит новое событие вaddCh
середина. и позвонитеadd
Путь в том, чтобы управлять всемиlistener
изsharedProcessor
.
Как указано выше,sharedProcessor
Ответственность состоит в том, чтобы управлять всеми обработчиками и распространять события, а реальная работа по распространениюdistribute
метод:
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
На данный момент у нас есть немного ясности:
- Контроллер регистрирует обработчик в Информере
- Информер от
sharedProcessor
Поддерживается весь обработчик (слушатель) - Когда Информер получает событие, передать
sharedProcessor.distribute
распространять событие - Контроллер запускается соответствующим обработчиком для обработки собственной логики.
Итак, остается вопрос, откуда берутся события Informer?
DeltaFIFO
Прежде чем анализировать событие приобретения Informer, необходимо заранее упомянуть очень интересный небольшой инструмент, а именноsharedIndexInformer.Run
Созданныйfifo
очередь:
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
DeltaFIFO — очень интересная очередь, соответствующий код определен вclient-go/tools/cache/delta_fifo.go
середина. Для очереди наиболее важными должны быть метод Add и метод Pop.DeltaFIFO предоставляет несколько методов Add.Хотя разные методы различаются в соответствии с разными типами событий (добавление/обновление/удаление/синхронизация), в конечном итоге они будут выполняться.queueActionLocked
:
// queueActionLocked appends to the delta list for the object.
// Caller must lock first.
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// If object is supposed to be deleted (last event is Deleted),
// then we should ignore Sync events, because it would result in
// recreation of this object.
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
// We need to remove this from our map (extra items in the queue are
// ignored if they are not in the map).
delete(f.items, id)
}
return nil
}
queueActionLocked
Первый параметр actionType метода — это тип события:
const (
Added DeltaType = "Added" // watch api 获得的创建事件
Updated DeltaType = "Updated" // watch api 获得的更新事件
Deleted DeltaType = "Deleted" // watch api 获得的删除事件
Sync DeltaType = "Sync" // 触发了 List Api,需要刷新缓存
)
Из типа события и метода постановки в очередь видно, что это очередь с бизнес-функциями, а не просто "первым пришел - первым обслужен". В методе постановки в очередь есть две очень тонкие конструкции:
- События, входящие в очередь, сначала определит, есть ли непромокаемые события в ресурсе, а затем обрабатывают их соответствующим образом
- Если ресурс был удален, если ресурс уже удален, больше не обрабатывается.
Второй момент понять несложно: если сработал запрос List и ресурс, подлежащий обработке, оказался удаленным, то его не нужно повторно ставить в очередь на обработку. Первый пункт необходимо объединить с методом удаления из очереди, чтобы посмотреть:
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
DeltaFIFOPop
У метода есть входной параметр, который является функцией обработки.Когда очередь закончится, DeltaFIFO сначала получит ресурс в соответствии с идентификатором ресурсавсе события, а затем передать его функции-обработчику.
Как показано на фиговом рабочем процессе:
В целом, метод постановки в очередь DeltaFIFO сначала определит, находится ли ресурс уже в очереди.items
, если оно уже существует, указывая на то, что ресурс не был потреблен (все еще находится в очереди в очереди), а затем напрямую добавляет событие кitems[resource_id]
в. Если не найденоitems
, это создастitems[resource_id]
и добавьте идентификатор ресурса кqueue
середина.
А метод DeltaFIFO вне очереди будетqueue
Получить переднюю часть очереди, затем отitems
Заберите все события ресурса и, наконец, вызовитеPop
метод передан вPopProcessFunc
Тип функции-обработчика.
Таким образом, характеристика DeltaFIFO заключается в том, что в очередь помещаются события (ресурса), а когда он удаляется из очереди, получаются все события ресурса, которые были поставлены в очередь первыми. Этот дизайн гарантирует, что не будет голодания из-за сумасшедшего производственного события определенного ресурса, из-за которого другие ресурсы не будут обработаны.
controller
DeltaFIFO является очень важным компонентом, что делает его действительно ценным, так этоcontroller
.
Хотя использование исходного кода K8Scontroller
слово, но этоcontroller
Это не контроллер ресурсов, такой как контроллер развертывания. Вместо этого это контроллер событий, который связывает предыдущее и следующее (получает событие от API-сервера и отправляет его на Информер для обработки).
controller
Есть две обязанности:
- Получайте события с сервера Api через List-Watch и отправляйте событие в DeltaFIFO.
- будет
sharedIndexInformer
изHandleDeltas
Поп-методы как параметр вызова DeltaFIFO
controller
Определение очень простое, его сутьReflector
:
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
Reflector
Код громоздкий, но функция относительно проста, что определяется с помощью sharedIndexInformer.listerWatcher
Выполните List-Watch и поместите полученные события в DeltaFIFO.
controller
После запускаReflector
запустить, затем выполнитьprocessLoop
, через бесконечный цикл события ресурсов, которые необходимо обработать, будут непрерывно считываться из DeltaFIFO и отправляться вsharedIndexInformer
изHandleDeltas
метод (создатьcontroller
когда назначен наconfig.Process
).
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
Если мы еще раз посмотрим на метод HandleDeltas в sharedIndexInformer, мы обнаружим, что весь процесс потребления событий открыт:
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
Мы знали ранееprocessor.distribute
метод отправлять событие всемlistener
,иcontroller
Будет использоватьсяReflector
Получите событие от ApiServer, поместите его в очередь, а затем передайтеprocessLoop
Возьмите все события для обработки ресурса из очереди и, наконец, передайтеsharedIndexInformer
изHandleDeltas
метод, называемыйprocessor.distribute
.
Таким образом, мы можем организовать весь поток событий в виде следующей диаграммы:
Indexer
Выше мы разобрали событие от приема до раздачи, и разобрали всю логику посередине, но в методе HandleDeltas у sharedIndexInformer есть какая-то более заметная логика, то есть все события будут обрабатываться первый.s.indexer
Обновляйте и распространяйте.
Как упоминалось ранее, Indexer — это потокобезопасное хранилище, используемое в качестве кеша, чтобы уменьшить нагрузку на ApiServer, когда контроллер ресурсов (Controller) запрашивает ресурсы.
Когда какое-либо событие обновляется, кеш в индексаторе будет сначала обновлен, а затем событие будет передано контроллеру ресурсов. ненужные запросы к APIServer.
Конкретная реализация хранилища индексатора находится в client-go/tools/cache/thread_safe_store.go, а данные хранятся вthreadSafeMap
середина:
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
По сути,threadSafeMap
Это карта с блокировкой чтения-записи. В дополнение к этому также могут быть определены индексы.Реализация индексов очень интересна и выполняется через два поля:
-
Indexers
Это карта, определяющая несколько функций индексирования, ключ — indexName, а значение — функция индексирования (значение индекса вычислительного ресурса). -
Indices
Отношение сопоставления между значением индекса и ключом данных сохраняется,Indices
— двухслойная карта, ключ первого слоя — indexName, аIndexers
Соответственно, определите, какой метод использовать для вычисления значения индекса.Значение представляет собой карту, сохраняющую связь между "значение индекса-ключ ресурса".
Логика корреляции относительно проста, как показано на рисунке 17:
MutationDetector
sharedIndexInformer
изHandleDeltas
метод, за исключениемs.indexer
Помимо обновленных данных,s.cacheMutationDetector
Обновлены данные.
сказал в началеsharedIndexInformer
Также начинаетcacheMutationDetector
, чтобы контролировать кеш индексатора.
Поскольку кэш-индексатор на самом деле является указателем, несколько контроллеров доступа к кэшированным ресурсам индексатора, фактически получают тот же экземпляр ресурса. Если есть контроллер, который не выполняет свой долг и модифицирует свойства ресурса, он неизбежно повлияет на правильность других контроллеров.
Роль MutationDetector заключается в том, чтобы периодически проверять, был ли модифицирован кеш.Когда Informer получает новое событие, MutationDetector сохраняет указатель на ресурс (тот же, что и индексатор) и глубокую копию ресурса. Периодически проверяя, согласуется ли ресурс, на который указывает указатель, с глубокой копией, сохраненной в начале, мы знаем, был ли изменен кэшированный ресурс.
Однако то, включен ли мониторинг, зависит от переменных среды.KUBE_CACHE_MUTATION_DETECTOR
Затронуто: если эта переменная среды не задана, создается экземпляр sharedIndexInformer.dummyMutationDetector
, ничего не делает после запуска.
еслиKUBE_CACHE_MUTATION_DETECTOR
верно, созданный экземпляр sharedIndexInformerdefaultCacheMutationDetector
, экземпляр будет периодически проверять кеш с интервалом в 1 с. Если обнаружится, что кеш изменен, он вызовет функцию обработчика сбоя, а если функция не определена, вызовет панику.
Суммировать
То, что объясняется в этой статье, следует рассматривать как диспетчер контроллеров в узком смысле, ведь она не включает конкретного диспетчера ресурсов (контроллера), а только объясняет, как диспетчер контроллеров «управляет контроллером».
Видно, что диспетчер контроллера проделал большую работу, чтобы контроллер мог сосредоточиться только на обработке событий, которые его волнуют, и ядром этой работы является информер.Когда вы понимаете, как информер работает с другими компонентов, то Controller Manager является основой для менеджера ресурсов.Стало понятно, что произошло.