Advanced kubernetes: анализ механизма информера

Kubernetes

задний план

Informer — очень важный компонент kubernetes, отвечающий за синхронизацию ресурсов и событий между компонентами и apiserver. информеры используются во многих компонентах. Это может быть сложно понять на первый взгляд, но углубившись, вы получите более глубокое понимание механизма обработки событий kubernetes и возможных узких мест в будущем.

Проще говоря, информер отслеживает определенные ресурсы, принимает изменения этого типа ресурсов от аписервера, обрабатывает изменения ресурсов зарегистрированной пользователем callback-функцией и сохраняет измененные объекты в локальном кэше.

Анализ исходного кода

Для простоты я выбрал для анализа собственный контроллер kubernetes. Проект находится вsample controller, Исходный код относительно прост, подходит для начинающих, чтобы начать понимать.

Сначала посмотрите на схему всего контроллера-образца:

sample-controller架构

Желтая часть — это связанная с контроллером структура, включая рабочую очередь. Синяя часть — сопутствующий контент client-go, включая информер, рефлектор (фактически пакет информера), индексатор. С точки зрения процесса рефлектор получает изменения событий от аписервера через механизм list&watch, попадает в очередь Delta FIFO и обрабатывается информером. Информатор доставляет события из дельта-очереди FIFO компоненту индексатора, а компонент индексатора сохраняет события в локальном кэше. После этого, поскольку пользователь заранее пропишет различные callback-функции событий для информера, эти callback-функции будут обрабатываться по-разному для разных компонентов. Например, в контроллере объект будет поставлен в рабочую очередь, а затем обработан бизнес-логикой контроллера. При обработке ссылка на объект будет получена из кеша. То есть обработка ресурсов каждым компонентом ограничивается локальным кешем и не взаимодействует с аписервером, пока ресурс не обновится.

Различные идеи, принятые в этой архитектуре, такие как очереди сообщений и разделение чтения-записи, здесь не обсуждаются, ведь эти идеи уже давно получили широкое распространение и применяются к различным архитектурам. Давайте будем практичными и взглянем на проект Sample-Controller. включают

  • main.go, controller.go
  • pkg/apis: файл определения интерфейса, types.go, должен быть изменен разработчиком и содержит структуру пользовательского ресурса. все функции deepcopy генерируются генератором кода
  • pkg/signals: этот пакет не нужно запутывать, он фиксирует сигнал завершения на разных платформах.
  • PKG/generated: Содержит ClientSet, Informer, Lister, весь этот пакет генерируется автоматически

Рис. Верхний контроль, может быть, как правило, как правило, понимают, что отношения между каждым элементом и видом на сборку всего проекта, источником здесь мы смотрим на конкретный механизм информатора.

Найдите main.go и просмотрите пакеты, на которые он ссылается.

	kubeinformers "k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	
	clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
	informers "k8s.io/sample-controller/pkg/generated/informers/externalversions"

Мы обнаружили, что используемая библиотека в основном клиентская.client-goда

Go clients for talking to a kubernetes cluster.

Предоставляет пользователю программный интерфейс. client-go может предоставить интерфейс программирования для собственных ресурсов kubernetes, и если мы хотим обрабатывать ресурсы crd, нам нужно обратиться к следующим двум пакетам: clientset и informers, которые основаны на содержимом, настроенном в pkg/apis, по коду -генератор сгенерирован. То есть настраиваем CRD, информер кастомного CRD, кастомную callback-функцию и бизнес-логику, а самый базовый фреймворк и механизм предоставляет kubernetes.

Посмотрите на несколько строк кода в функции main:

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)
    
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

Зависит отkubeconfigфайл илиmasterURLГенерировать конфиг и генерировать по конфигуkubeClientиexampleClient, первый используется для собственных ресурсов, а второй — для ресурсов CRD. позжеclientsetгенерироватьinformerFactory. На эту функцию стоит ориентироваться, первый параметр clientset, второйtime.Second*30даresyncPeriod. resyncPeriod относится к очистке локального кеша и созданию списка с apiserver каждый период времени. Это позволяет избежать ошибок бизнес-логики, вызванных ошибками в механизме list&watch, но в крупномасштабных кластерах стоимость повторного включения в список не следует недооценивать. Кому-то нравится устанавливать большее значение, а кому-то нравится устанавливать его на 0, то есть полностью доверять возможностям etcd.

NewSharedInformerFactory()Смотреть все возможное, вы можете увидеть следующую функцию

// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
	factory := &sharedInformerFactory{
		client:           client,
		namespace:        v1.NamespaceAll,
		defaultResync:    defaultResync,
		informers:        make(map[reflect.Type]cache.SharedIndexInformer),
		startedInformers: make(map[reflect.Type]bool),
		customResync:     make(map[reflect.Type]time.Duration),
	}

	// Apply all options
	for _, opt := range options {
		factory = opt(factory)
	}

	return factory
}

Генерируется здесь фабрика (фабричный паттерн предполагает знание паттернов проектирования, если не понимаете, компенсируйте), фабричный паттернclientчто было передано раньшеclientset,namespaceдаv1.NamespaceAll, который фиксирует указанные ресурсы всех пространств имен,defaultResyncобъяснил ранее,customResyncВы можете сделать разные циклы ресинхронизации для каждого Информера.informersЭто список Информеров, который может генерировать клиент, и каждый ИНФОРМЕРcache.SharedIndexInformer,startedInformersУказывает, был ли запущен каждый информер.

Послеfactory=opt(factory), можно увидетьoptэто функция изoptionsвходящий, покаoptionsЭто параметр переменной длины (детская обувь, знакомая с C, может быстро понять).

получить две фабрикиkubeInformerFactoryиexampleInformerFactoryПосле этого мы можем сгенерироватьcontrollerОбъект:

controller := NewController(kubeClient, exampleClient,
		kubeInformerFactory.Apps().V1().Deployments(),
		exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

NewController()Функция принимает четыре параметра, первые дваclientset, последние два зависят от ресурсаinformer. Мы прослушали два ресурса: деплоймент и Foo.kubeInformerFactory.Apps().V1().Deployments()Смысл этой строчки кода в том, чтобы выбрать информер ресурсов деплоя в фабрике, где apigroups — это app, а apiversion — это v1.интерфейс. Смысл интерфейса будет объяснен позже.

ВходитьNewController()После внутренней стороны следующий кодevent recorderконтент, который обычно используют пользователиkubectl get eventsС этим связаны команды. Дальнейший анализ в этой части проводиться не будет.

	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(klog.Infof)
	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

создаст новыйcontrollerобъект.

	controller := &Controller{
		kubeclientset:     kubeclientset,
		sampleclientset:   sampleclientset,
		deploymentsLister: deploymentInformer.Lister(),
		deploymentsSynced: deploymentInformer.Informer().HasSynced,
		foosLister:        fooInformer.Lister(),
		foosSynced:        fooInformer.Informer().HasSynced,
		workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
		recorder:          recorder,
	}

clientsetВсе они переданы, регистратор свежевыпущен выше, а контроллер также включает список и рабочую очередь. Принцип workqueue не относится к разряду клиент-го информера, если в другой статье будет конкретно анализироваться сэмпл-контроллер, то он будет введен. Мы с удивлением обнаружили, что листер на самом деле создавался методом информера, значит ли это, что уровень информера выше, чем у листера? Однако мы видимdeploymentInformerТип можно посмотреть

// DeploymentInformer provides access to a shared informer and lister for
// Deployments.
type DeploymentInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.DeploymentLister
}

Этот deploymentInformer - это просто интерфейс, используемыйInformer()иLister()метод для получения реальных экземпляров информера и листера. Вот почему мы говорили о входящихNewController()Информер - это всего лишь интерфейс. тогда смотриInformer()метод, который фактически инициализирует экземпляр информера.

func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}

вInformerFor()Определите конкретную реализацию в client-go/informers/factory.go:

// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}

	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}

	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer

	return informer
}

Процесс относительно простой, если информер существует, прямое возвращение к экземпляру информирователя, а затем определить, будет ли указано иноеcustomResync, если не соблюдатьdefaultResync, следует отметить, что это значение по умолчанию не является константой, а значением, которое мы передали ранее, здесь является 30s. После этого позвонитеnewFunc()Создайте экземпляр информера. Эта новая функцияInformerFor()Аргумент , в функции предыдущего слоя в видеf.defaultInformer. Его конкретный вызовreturn NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions).

Если вы помните первоначальный процесс, когда мы создавали InformerFactory, мы указали пространство имен как All и tweakListOptions как пустые.Приведенные выше параметры показывают здесь только свою функцию. Кроме того, здесь есть предзнаменование, указали Индексаторы.NamespaceIndexзаMetaNamespaceIndexFunc. Это предзнаменование включает в себя механизм индексатора, мы не будем подробно его сейчас раскрывать, просто знайте, что это означает, что объекты могут быть классифицированы в соответствии с пространством имен при их сохранении. см. далееNewFilteredDeploymentInformer()Внутри функции:

// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).List(options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.AppsV1().Deployments(namespace).Watch(options)
			},
		},
		&appsv1.Deployment{},
		resyncPeriod,
		indexers,
	)
}

Это фактически инициализирует экземпляр информера. Здесь зарегистрирована функция обратного вызова List&Watch.Коротко говоря, функция обратного вызова list&watch выглядит следующим образом:

	err = c.client.Get().
		Namespace(c.ns).
		Resource("deployments").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do().
		Into(result)

Типичный запрос RESTful.

Создайте экземпляр информера с помощьюNewSharedIndexInformer()Функция завершена, и входными параметрами являются ListWatcher (функция обратного вызова, упомянутая выше), объект среды выполнения (здесь развертывание) и индексаторы. индексаторы — это не локальный кеш, аmap[string]Funcтип, в соответствии с кодом выше, здесь строкаNamespaceIndex, функцияMetaNamespaceIndexFunc.sharedIndexInformerСодержит следующее:

	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      objType,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
		clock:                           realClock,
	}

processerЭто очень важный модуль, но только по коду здесь сложно понять, что делает процессор, мы подробно разберем процессор позже. А пока сосредоточимся наNewIndex():

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
	}
}

Он создал кеш, онkeyFuncдаDeletionHandlingMetaNamespaceKeyFunc, то есть принять объект, сгенерировать егоnamepace/nameНить. Как следует из названия, кеш — это слой кеша, но это не слой, который фактически хранит данные, а

  1. Computing keys for objects via keyFunc
  2. Invoking methods of a ThreadSafeStorage interface

Реальные данные хранятся в кешеThreadSafeStore. Чтобы разобраться в идеях, сначала создайте informerFactory, а затем инициируйте создание экземпляра информера во время создания контроллера, который инициирует создание индексатора, а индекс создает новый threadSafeStore.NewThreadSafeStore()Передаются два параметра: indexers и Indices{}. Как упоминалось ранее, indexers — это карта IndexFunc, а Indices{} — это карта Index, а Index — это карта.map[string]sets.String.

Этого уровня хранения немного, с примерами, поясняющими следующее: например, когда мы хотим добавить объект, вызов кешаcache.Add(), превратить объект в ключ через keyFunc. Очевидно, наш пример становитсяnamepace/name. звонить послеc.cacheStorage.Add(key, obj), здесь мы создаем новый threadSafeMap, поэтому вAdd()позвонил вc.updateIndices(). Этот процесс немного более извилистый, и исходный код размещен ниже. После удаления старого объекта переберите все c.indexers. Как упоминалось выше, есть только один индексатор, и имяNamespaceIndexРоль INDEXFUNC состоит в том, чтобы получить объект пространства имен. Таким образом, мы получаем indexvalues, это объект пространства имен. Итак, мы прошлиindices[NamespaceIndex]Получить индекс, в котором должен храниться объект.Проще говоря, индекс соответствует пространству имен. После этого мы просматриваем indexValues, Поскольку в настоящее время у нас есть только одно пространство имен, мы напрямую находимset := index[namespace], где множествоmap[string]{struct}, поэтому после того, как набор вставлен в объект, он становится типом ключевого объекта. Таким образом, k индексов — это имя indexFunc, v — это индекс, k индексов — это значение indexFunc(obj), а v — этоmap[string]struct{}, где k — значение keyFunc(obj), а v — содержимое obj. Это действительно немного сбивает с толку, но это не влияет на наше понимание механизма информера.

func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	// if we got an old object, we need to remove it before we add it again
	if oldObj != nil {
		c.deleteFromIndices(oldObj, key)
	}
	for name, indexFunc := range c.indexers {
		indexValues, err := indexFunc(newObj)
		if err != nil {
			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
		}
		index := c.indices[name]
		if index == nil {
			index = Index{}
			c.indices[name] = index
		}

		for _, indexValue := range indexValues {
			set := index[indexValue]
			if set == nil {
				set = sets.String{}
				index[indexValue] = set
			}
			set.Insert(key)
		}
	}
}

Разобравшись с проблемой хранения, вернемся к sharedIndexInformer и посмотрим, что еще мы не рассмотрели. Индексатор — это интерфейс для получения базового хранилища, а с хранилищем мы разобрались, процессор, resyncCheckPeriod, defaultEventHandlerResyncPeriod и часы также упоминались, а о том, что делает процессор, мы поговорим подробнее позже, cacheMutationDetector фактически не используется , его можно игнорировать; listWatcher, о котором мы также говорили; objectType, запускается, останавливается Само собой разумеется, blockDeltas может приостанавливать обработку событий в дельта-очереди FIFO, позволяя безопасно присоединиться к новому обработчику событий. Теперь у нас остался только контроллер.

type sharedIndexInformer struct {
	indexer    Indexer
	controller Controller

	processor             *sharedProcessor
	cacheMutationDetector CacheMutationDetector

	// This block is tracked to handle late initialization of the controller
	listerWatcher ListerWatcher
	objectType    runtime.Object

	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
	// shouldResync to check if any of our listeners need a resync.
	resyncCheckPeriod time.Duration
	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
	// value).
	defaultEventHandlerResyncPeriod time.Duration
	// clock allows for testability
	clock clock.Clock

	started, stopped bool
	startedLock      sync.Mutex

	// blockDeltas gives a way to stop all event distribution so that a late event handler
	// can safely join the shared informer.
	blockDeltas sync.Mutex
}

назад, сейчас жеNewController(), мы только что проанализировалиInformer(), а теперь посмотриdeploymentInformer.Informer().HasSynced. Когда я ранее объяснял рабочий процесс, я сказал, что компонент сначала выполнит список, а затем продолжит просмотр. Завершение действия списка зависит от каждого информатора.HasSyncedправда.

После этого регистрируем callback-функцию события для экземпляра информера:

	deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: controller.handleObject,
		UpdateFunc: func(old, new interface{}) {
			newDepl := new.(*appsv1.Deployment)
			oldDepl := old.(*appsv1.Deployment)
			if newDepl.ResourceVersion == oldDepl.ResourceVersion {
				// Periodic resync will send update events for all known Deployments.
				// Two different versions of the same Deployment will always have different RVs.
				return
			}
			controller.handleObject(new)
		},
		DeleteFunc: controller.handleObject,
	})

Функции обратного вызова делятся на три категории: добавить, обновить и удалить. Каждое действие может зарегистрировать другую функцию обратного вызова. В этом примере регистрация одинакова. Если вы войдетеhandleObject(), вы можете обнаружить, что все, что он делает, помещаетсяfooобъект добавлен в контроллерworkqueueсередина. Но мы пока не будем рассматривать эту часть.

пройти черезNewController()Мы получили объект контроллера, который содержит список, регистратор, набор клиентов. А информер мы инициализировали ранее, поэтому следующим шагом будет запуск контроллера и информера и пусть они работают вместе.

	kubeInformerFactory.Start(stopCh)
	exampleInformerFactory.Start(stopCh)

	if err = controller.Run(2, stopCh); err != nil {
		klog.Fatalf("Error running controller: %s", err.Error())
	}

Функция, которая запускает информер, - это каждый заводStart()функция, реальная реализация этой функции находится в client-go/informers/factory.go,

	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
			go informer.Run(stopCh)
			f.startedInformers[informerType] = true
		}
	}

Запускайте каждый информер по отдельности и отмечайте их статус как запущенные.Run()Функция реализована в client-go/tools/cache/shared_informer.go вfunc (s *sharedIndexInformer) Run(stopCh <-chan struct{})Эти примеры показывают, что индекс акций информатора, который является общим локальным кешем.Run()Функция представляет еще один очень важный компонент: дельта-FIFO. Посмотрите на процесс создания:fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer). Есть два параметра, одинMetaNamespaceKeyFunc, о котором несколько раз упоминалось выше, состоит в том, чтобы получить объектnamespace/name, другойs.indexer, — это интерфейс для получения базового хранилища.

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
	f := &DeltaFIFO{
		items:        map[string]Deltas{},
		queue:        []string{},
		keyFunc:      keyFunc,
		knownObjects: knownObjects,
	}
	f.cond.L = &f.lock
	return f
}

Более важное содержимое Deltafifo включаетitems, queueиknownObjects, где предметыmap[string]Deltas{}, где вводятся понятия delta и knownObjects. Дельта относится к изменению объекта, то есть к приращению ресурсов, что очень легко понять. По нашему интуитивному восприятию мы можем обработать событие, зачем нам knownObjects? Причина в том, что идет ресинхронизация, а в дельта-очереди FIFO много содержимого, если оно не обработано, если в это время выполняется ресинхронизация, дельта удаления некоторых объектов, которые были удалены в etcd будет безвозвратно потерян. На этом этапе knownObjects знает, какие объекты были удалены, и регенерирует дельту удаления, которая не была обработана вовремя. Не будем вдаваться в подробности известных объектов, посмотримDeltasДетали. Дельты представляют собой набор дельт:type Deltas []Delta, в то время как структура Дельты действие+контент

// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
//     state of the object before it was deleted.
type Delta struct {
	Type   DeltaType
	Object interface{}
}

Typeможет быть одним из четырех

  • Added
  • Updated
  • Deleted
  • Синхронизация: используется только в RESYNC

Соответствует точно три события, прежде чем он придет. Здесь есть вопрос: почему хранятся в Deltafifo Direct Depast Delta Deltas без него? Поскольку он может быть объединен, уменьшая количество событий обработки. Когда пользователь постоянно выполняет множество операций, прежде чем они будут объединены в DELTAS, уменьшают стресс компонента. Подробные разговоры позже о том, как добиться слияния дельты.

Теперь вернитесь кRun()функция, мы далее вводимsharedIndexInformerПоследний компонент: контроллер. Этот контроллер является не сэмпл-контроллером, а контроллером в информере. После создания дельта-FIFO инициализируйте конфигурацию, требуемую контроллером.

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

Fifo передается прозрачно, и теперь он называется очередью; listerWatcher и objectType все прозрачно передаются; все вопросы, связанные с ресинхронизацией, связаны с процессором, который связан с механизмом list&watch; обратите вниманиеconfig.Processназначить какs.HandleDeltasЭто обработчик дельты, и позже говорилось о том, как он будет работать в связке с threadSafeStore. После завершения инициализации настройте конфигурацию с новым контроллером, после запуска s.processor запустите s.controller. Отмечается, что в порядке отсрочки контроллер получает стоп-сигналstopCh, который передается с верхнего уровня и используется сэмпл-контроллером, информером и т. д.; вновь созданныйprocessorStopCh, при завершении программы гарантируется, что контроллер получает stopCh, останавливает контроллер и закрываетprocessorStopCh, пока процессор проходитwg.StartWithChannel()Гарантируется, что после закрытия канала процессор также остановится. Об этом говорят комментарии

Separate stop channel because Processor should be stopped strictly after controller

Итак, вопрос в том, что именно делает процессор?wg.StartWithChannel(processorStopCh, s.processor.run)Запускается SharedProcessor, который сначала добавляет блокировку чтения-записи, блокирует чтение и присоединяется ко всем слушателям. Каждый слушатель является процессорным прослушивателем, а затем отдельноp.wg.Start(listener.run)иp.wg.Start(listener.pop).字面上来说,sharedProcessor将事件分发给各个processor处理,然而但从这边我们还是不知道processor的功能。

Сначала отложим процессор, посмотримs.controller.Run()что вы наделали.

// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock

	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()

	var wg wait.Group
	defer wg.Wait()

	wg.StartWithChannel(stopCh, r.Run)

	wait.Until(c.processLoop, time.Second, stopCh)
}

Здесь представлена ​​новая вещь, отражатель.NewReflector()В него передаются четыре переменные: listWatcher, objectType, queue, resyncPeriod. Очередь на самом деле представляет собой дельта-FIFO.В сочетании с listWatcher и resyncPeriod мы можем легко догадаться, что рефлектор отвечает за взаимодействие с apiserver для получения изменений в ресурсах. Итак, мы можем продолжитьr.Runиwait.Until(c.processLoop, time.Second, stopCh)две функции. Получив сигнал stopCh, остановитьprocessLoopКонцевой отражатель. Первый взглядr.Run, логика внутри очень простая, зацикливание навсегдаListAndWatch, он перезапустится, даже если что-то неожиданно пойдет не так.

Механизм ListWatch не анализирует здесь исходный код, в принципе, он передаетlist, err = pager.List(context.Background(), options)Каждый раз, когда список получает объект, он всегда будет отображать resourceVersion как 0 при выполнении этого списка. проходить послеlistMetaInterface.GetResourceVersion()Получить resourceVersion и передатьsyncWith()Замените номер версии в локальном кеше. После того, как список будет заполнен, будет запущена процедура go для периодической повторной синхронизации. После этого он войдет в бесконечный цикл и выполнит операцию наблюдения. И каждый раз после просмотра события запускатьr.watchHandler(), который получит тип и содержимое события через механизм отражения и вызовет различные функции обратного вызова в зависимости от типа. В коде обновление контента будет своевременно обновляться до дельта-FIFO. То есть через механизм list&watch получаются изменения ресурсов. Следующий код может заметить, что шаблон Type+content точно такой же, какDeltaтип совпадает.

			switch event.Type {
			case watch.Added:
				err := r.store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Modified:
				err := r.store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := r.store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
				}

прочитай этоr.Run,посмотриprocessLoop(). Он занимает 1 с как цикл и выполняется периодически Логика очень проста, вызовc.config.Queue.Pop(PopProcessFunc(c.config.Process)), то есть взять элемент из дельта-FIFO и обработать его. Вы можете перевернуть его, вышеупомянутый c.config.Processs.HandleDeltas. Логика тут тоже очень простая, он взаимодействует с индексатором sharedIndexInformer, то есть кеша, обновляет содержимое, которое мы получаем из Delta FIFO, в нижележащий threadSafeMap, а затем передаетs.processor.distribute()Распространите сообщение.

        case Sync, Added, Updated:
			isSync := d.Type == Sync
			s.cacheMutationDetector.AddObject(d.Object)
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
				s.processor.distribute(addNotification{newObj: d.Object}, isSync)
			}
		case Deleted:
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}

В дистрибутиве через sharedProcesser проходитlistener.add(obj)Распространите этот объект на каждого слушателя. И функция снова выполняетсяp.addCh <- notification. Увидев это, мы можем, наконец, совместить функции вышеописанного выпрямляющего процессора. Чтобы разобраться с идеями, контроллер (не семпл-контроллер) переупаковывает свой собственный модуль в качестве рефлектора во время инициализации, а рефлектор изменяет изменения ресурса на тип+объект через механизм list&watch.DeltaПоместите его в очередь Delta FIFO, и контроллер пройдетprocessLoopПолучите объект из дельта-очереди FIFO и обновите последний ресурс до threadSafeMa. На самом деле, грубо говоря, рефлектор — это часть контроллера, а контроллер — это часть информера, в итоге мы можем обнаружить, что информер получает события и синхронизирует ресурсы через list&watch, а простая очередь сообщений производится через дельта-FIFO.С одной стороны удобно реализовать режим действие+контент для удобства работыУдалитьТакое поведение, с одной стороны, очередь сообщений сжимает событие, что повышает эффективность обработки.

Теперь вернитесь и посмотрите на процессор. Когда дело доходит до вершины дельты FIFOpop()и, наконец, инкапсулировать объект какnotifcation, передать в доп. Напомним, что в функции Run() объекта sharedIndexInformer последний вызов выполняетсяwg.StartWithChannel(processorStopCh, s.processor.run), который, в свою очередь, вызывает всех слушателейp.wg.Start(listener.run)иp.wg.Start(listener.pop), сначала посмотрите на функцию pop:

func (p *processorListener) pop() {
	defer utilruntime.HandleCrash()
	defer close(p.nextCh) // Tell .run() to stop

	var nextCh chan<- interface{}
	var notification interface{}
	for {
		select {
		case nextCh <- notification:
			// Notification dispatched
			var ok bool
			notification, ok = p.pendingNotifications.ReadOne()
			if !ok { // Nothing to pop
				nextCh = nil // Disable this select case
			}
		case notificationToAdd, ok := <-p.addCh:
			if !ok {
				return
			}
			if notification == nil { // No notification to pop (and pendingNotifications is empty)
				// Optimize the case - skip adding to pendingNotifications
				notification = notificationToAdd
				nextCh = p.nextCh
			} else { // There is already a notification waiting to be dispatched
				p.pendingNotifications.WriteOne(notificationToAdd)
			}
		}
	}
}

вышеприведенный абзацpop()Очень красиво, несколько функций реализованы в одной функции. Когда addCh получает сигнал, то есть после взятия дельты из дельта-FIFO, определяется, пусто ли уведомление. Вообще говоря, уведомление всегда читается из pendingNoditication.Когда буфер пуст, уведомление пусто после обработки текущего события. В этом случае при появлении нового события снова устанавливается уведомление. Когда уведомление не пусто, содержимое уведомления передается в nextCh, а другое уведомление извлекается из буфера. Объяснять очень запутанно.Короче говоря, принцип круговой очереди, которая принимает события в дельта-FIFO через addCh, сохраняет эти события через уведомление и буфер, и отправляет их через nextCh.

Логика nextCh находится вrun(), вы можете понять, взглянув на следующий код, в соответствии с разными типами событий выполняется различная обработка. Здесь p.handler — это обработчик событий, который мы видели в основной функции проекта.Мы ранее зарегистрировали функции обратного вызова с помощью eventHandler, и эти функции обратного вызова на самом деле вызываются здесь.

				case updateNotification:
					p.handler.OnUpdate(notification.oldObj, notification.newObj)
				case addNotification:
					p.handler.OnAdd(notification.newObj)
				case deleteNotification:
					p.handler.OnDelete(notification.oldObj)

На данный момент мы проанализировали все механизмы информера. Что касается некоторых дополнительных деталей, таких как объединение событий в дельта-FIFO и т. д., то они не будут анализироваться из-за нехватки места. На самом деле упомянутое выше содержание очень простое с верхнего уровня, разработчикам даже не нужно четко понимать лежащий в основе принцип, они могут использовать механизм Информера для программирования и реализации простого образца-контроллера. Дизайн информера.Очень симпатичный.

использовать

С точки зрения неспециалиста, чтобы быстро приступить к программированию механизма информера, достаточно запомнить следующие шаги.

  • Зависит отclientcmd.BuildConfigFromFlagsСоздать конфигурацию
  • передается по конфигурацииNewForConfigгенерировать набор клиентов
  • по набору клиентовNewSharedInformerFactoryпостроить завод
  • Зарегистрируйте eventHandler с конкретным экземпляром Informer, который вы хотите
  • перечислитьfactory.Start()

На данный момент вы можете получить все данные в etcd через clientset! Так же есть небольшое замечание, клиентсет может общаться напрямую с аписервером, а должен общаться с аписервером напрямую через клиентсет во время операций записи, при операциях чтения пытаться получить данные из локального кеша через листер, а потому полученный data — это не копия, а ссылка, поэтому сначала сделайте глубокую копию, а потом уже обновляйте.