задний план
Informer — очень важный компонент kubernetes, отвечающий за синхронизацию ресурсов и событий между компонентами и apiserver. информеры используются во многих компонентах. Это может быть сложно понять на первый взгляд, но углубившись, вы получите более глубокое понимание механизма обработки событий kubernetes и возможных узких мест в будущем.
Проще говоря, информер отслеживает определенные ресурсы, принимает изменения этого типа ресурсов от аписервера, обрабатывает изменения ресурсов зарегистрированной пользователем callback-функцией и сохраняет измененные объекты в локальном кэше.
Анализ исходного кода
Для простоты я выбрал для анализа собственный контроллер kubernetes. Проект находится вsample controller, Исходный код относительно прост, подходит для начинающих, чтобы начать понимать.
Сначала посмотрите на схему всего контроллера-образца:
Желтая часть — это связанная с контроллером структура, включая рабочую очередь. Синяя часть — сопутствующий контент client-go, включая информер, рефлектор (фактически пакет информера), индексатор. С точки зрения процесса рефлектор получает изменения событий от аписервера через механизм list&watch, попадает в очередь Delta FIFO и обрабатывается информером. Информатор доставляет события из дельта-очереди FIFO компоненту индексатора, а компонент индексатора сохраняет события в локальном кэше. После этого, поскольку пользователь заранее пропишет различные callback-функции событий для информера, эти callback-функции будут обрабатываться по-разному для разных компонентов. Например, в контроллере объект будет поставлен в рабочую очередь, а затем обработан бизнес-логикой контроллера. При обработке ссылка на объект будет получена из кеша. То есть обработка ресурсов каждым компонентом ограничивается локальным кешем и не взаимодействует с аписервером, пока ресурс не обновится.
Различные идеи, принятые в этой архитектуре, такие как очереди сообщений и разделение чтения-записи, здесь не обсуждаются, ведь эти идеи уже давно получили широкое распространение и применяются к различным архитектурам. Давайте будем практичными и взглянем на проект Sample-Controller. включают
- main.go, controller.go
- pkg/apis: файл определения интерфейса, types.go, должен быть изменен разработчиком и содержит структуру пользовательского ресурса. все функции deepcopy генерируются генератором кода
- pkg/signals: этот пакет не нужно запутывать, он фиксирует сигнал завершения на разных платформах.
- PKG/generated: Содержит ClientSet, Informer, Lister, весь этот пакет генерируется автоматически
Рис. Верхний контроль, может быть, как правило, как правило, понимают, что отношения между каждым элементом и видом на сборку всего проекта, источником здесь мы смотрим на конкретный механизм информатора.
Найдите main.go и просмотрите пакеты, на которые он ссылается.
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned"
informers "k8s.io/sample-controller/pkg/generated/informers/externalversions"
Мы обнаружили, что используемая библиотека в основном клиентская.client-goда
Go clients for talking to a kubernetes cluster.
Предоставляет пользователю программный интерфейс. client-go может предоставить интерфейс программирования для собственных ресурсов kubernetes, и если мы хотим обрабатывать ресурсы crd, нам нужно обратиться к следующим двум пакетам: clientset и informers, которые основаны на содержимом, настроенном в pkg/apis, по коду -генератор сгенерирован. То есть настраиваем CRD, информер кастомного CRD, кастомную callback-функцию и бизнес-логику, а самый базовый фреймворк и механизм предоставляет kubernetes.
Посмотрите на несколько строк кода в функции main:
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
Зависит отkubeconfig
файл илиmasterURL
Генерировать конфиг и генерировать по конфигуkubeClient
иexampleClient
, первый используется для собственных ресурсов, а второй — для ресурсов CRD. позжеclientset
генерироватьinformerFactory
. На эту функцию стоит ориентироваться, первый параметр clientset, второйtime.Second*30
даresyncPeriod
. resyncPeriod относится к очистке локального кеша и созданию списка с apiserver каждый период времени. Это позволяет избежать ошибок бизнес-логики, вызванных ошибками в механизме list&watch, но в крупномасштабных кластерах стоимость повторного включения в список не следует недооценивать. Кому-то нравится устанавливать большее значение, а кому-то нравится устанавливать его на 0, то есть полностью доверять возможностям etcd.
NewSharedInformerFactory()
Смотреть все возможное, вы можете увидеть следующую функцию
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
Генерируется здесь фабрика (фабричный паттерн предполагает знание паттернов проектирования, если не понимаете, компенсируйте), фабричный паттернclient
что было передано раньшеclientset
,namespace
даv1.NamespaceAll
, который фиксирует указанные ресурсы всех пространств имен,defaultResync
объяснил ранее,customResync
Вы можете сделать разные циклы ресинхронизации для каждого Информера.informers
Это список Информеров, который может генерировать клиент, и каждый ИНФОРМЕРcache.SharedIndexInformer
,startedInformers
Указывает, был ли запущен каждый информер.
Послеfactory=opt(factory)
, можно увидетьopt
это функция изoptions
входящий, покаoptions
Это параметр переменной длины (детская обувь, знакомая с C, может быстро понять).
получить две фабрикиkubeInformerFactory
иexampleInformerFactory
После этого мы можем сгенерироватьcontroller
Объект:
controller := NewController(kubeClient, exampleClient,
kubeInformerFactory.Apps().V1().Deployments(),
exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
NewController()
Функция принимает четыре параметра, первые дваclientset
, последние два зависят от ресурсаinformer
. Мы прослушали два ресурса: деплоймент и Foo.kubeInformerFactory.Apps().V1().Deployments()
Смысл этой строчки кода в том, чтобы выбрать информер ресурсов деплоя в фабрике, где apigroups — это app, а apiversion — это v1.интерфейс. Смысл интерфейса будет объяснен позже.
ВходитьNewController()
После внутренней стороны следующий кодevent recorderконтент, который обычно используют пользователиkubectl get events
С этим связаны команды. Дальнейший анализ в этой части проводиться не будет.
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
создаст новыйcontroller
объект.
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
recorder: recorder,
}
clientset
Все они переданы, регистратор свежевыпущен выше, а контроллер также включает список и рабочую очередь. Принцип workqueue не относится к разряду клиент-го информера, если в другой статье будет конкретно анализироваться сэмпл-контроллер, то он будет введен. Мы с удивлением обнаружили, что листер на самом деле создавался методом информера, значит ли это, что уровень информера выше, чем у листера? Однако мы видимdeploymentInformer
Тип можно посмотреть
// DeploymentInformer provides access to a shared informer and lister for
// Deployments.
type DeploymentInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.DeploymentLister
}
Этот deploymentInformer - это просто интерфейс, используемыйInformer()
иLister()
метод для получения реальных экземпляров информера и листера. Вот почему мы говорили о входящихNewController()
Информер - это всего лишь интерфейс. тогда смотриInformer()
метод, который фактически инициализирует экземпляр информера.
func (f *deploymentInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer)
}
вInformerFor()
Определите конкретную реализацию в client-go/informers/factory.go:
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
Процесс относительно простой, если информер существует, прямое возвращение к экземпляру информирователя, а затем определить, будет ли указано иноеcustomResync
, если не соблюдатьdefaultResync
, следует отметить, что это значение по умолчанию не является константой, а значением, которое мы передали ранее, здесь является 30s. После этого позвонитеnewFunc()
Создайте экземпляр информера. Эта новая функцияInformerFor()
Аргумент , в функции предыдущего слоя в видеf.defaultInformer
. Его конкретный вызовreturn NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
.
Если вы помните первоначальный процесс, когда мы создавали InformerFactory, мы указали пространство имен как All и tweakListOptions как пустые.Приведенные выше параметры показывают здесь только свою функцию. Кроме того, здесь есть предзнаменование, указали Индексаторы.NamespaceIndex
заMetaNamespaceIndexFunc
. Это предзнаменование включает в себя механизм индексатора, мы не будем подробно его сейчас раскрывать, просто знайте, что это означает, что объекты могут быть классифицированы в соответствии с пространством имен при их сохранении. см. далееNewFilteredDeploymentInformer()
Внутри функции:
// NewFilteredDeploymentInformer constructs a new informer for Deployment type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.AppsV1().Deployments(namespace).Watch(options)
},
},
&appsv1.Deployment{},
resyncPeriod,
indexers,
)
}
Это фактически инициализирует экземпляр информера. Здесь зарегистрирована функция обратного вызова List&Watch.Коротко говоря, функция обратного вызова list&watch выглядит следующим образом:
err = c.client.Get().
Namespace(c.ns).
Resource("deployments").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do().
Into(result)
Типичный запрос RESTful.
Создайте экземпляр информера с помощьюNewSharedIndexInformer()
Функция завершена, и входными параметрами являются ListWatcher (функция обратного вызова, упомянутая выше), объект среды выполнения (здесь развертывание) и индексаторы. индексаторы — это не локальный кеш, аmap[string]Func
тип, в соответствии с кодом выше, здесь строкаNamespaceIndex
, функцияMetaNamespaceIndexFunc
.sharedIndexInformer
Содержит следующее:
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
processer
Это очень важный модуль, но только по коду здесь сложно понять, что делает процессор, мы подробно разберем процессор позже. А пока сосредоточимся наNewIndex()
:
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
Он создал кеш, онkeyFunc
даDeletionHandlingMetaNamespaceKeyFunc
, то есть принять объект, сгенерировать егоnamepace/nameНить. Как следует из названия, кеш — это слой кеша, но это не слой, который фактически хранит данные, а
- Computing keys for objects via keyFunc
- Invoking methods of a ThreadSafeStorage interface
Реальные данные хранятся в кешеThreadSafeStore
. Чтобы разобраться в идеях, сначала создайте informerFactory, а затем инициируйте создание экземпляра информера во время создания контроллера, который инициирует создание индексатора, а индекс создает новый threadSafeStore.NewThreadSafeStore()
Передаются два параметра: indexers и Indices{}. Как упоминалось ранее, indexers — это карта IndexFunc, а Indices{} — это карта Index, а Index — это карта.map[string]sets.String
.
Этого уровня хранения немного, с примерами, поясняющими следующее: например, когда мы хотим добавить объект, вызов кешаcache.Add()
, превратить объект в ключ через keyFunc. Очевидно, наш пример становитсяnamepace/name. звонить послеc.cacheStorage.Add(key, obj)
, здесь мы создаем новый threadSafeMap, поэтому вAdd()
позвонил вc.updateIndices()
. Этот процесс немного более извилистый, и исходный код размещен ниже. После удаления старого объекта переберите все c.indexers. Как упоминалось выше, есть только один индексатор, и имяNamespaceIndex
Роль INDEXFUNC состоит в том, чтобы получить объект пространства имен. Таким образом, мы получаем indexvalues, это объект пространства имен. Итак, мы прошлиindices[NamespaceIndex]
Получить индекс, в котором должен храниться объект.Проще говоря, индекс соответствует пространству имен. После этого мы просматриваем indexValues, Поскольку в настоящее время у нас есть только одно пространство имен, мы напрямую находимset := index[namespace]
, где множествоmap[string]{struct}
, поэтому после того, как набор вставлен в объект, он становится типом ключевого объекта. Таким образом, k индексов — это имя indexFunc, v — это индекс, k индексов — это значение indexFunc(obj), а v — этоmap[string]struct{}
, где k — значение keyFunc(obj), а v — содержимое obj. Это действительно немного сбивает с толку, но это не влияет на наше понимание механизма информера.
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// if we got an old object, we need to remove it before we add it again
if oldObj != nil {
c.deleteFromIndices(oldObj, key)
}
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(newObj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name]
if index == nil {
index = Index{}
c.indices[name] = index
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set == nil {
set = sets.String{}
index[indexValue] = set
}
set.Insert(key)
}
}
}
Разобравшись с проблемой хранения, вернемся к sharedIndexInformer и посмотрим, что еще мы не рассмотрели. Индексатор — это интерфейс для получения базового хранилища, а с хранилищем мы разобрались, процессор, resyncCheckPeriod, defaultEventHandlerResyncPeriod и часы также упоминались, а о том, что делает процессор, мы поговорим подробнее позже, cacheMutationDetector фактически не используется , его можно игнорировать; listWatcher, о котором мы также говорили; objectType, запускается, останавливается Само собой разумеется, blockDeltas может приостанавливать обработку событий в дельта-очереди FIFO, позволяя безопасно присоединиться к новому обработчику событий. Теперь у нас остался только контроллер.
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector CacheMutationDetector
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
назад, сейчас жеNewController()
, мы только что проанализировалиInformer()
, а теперь посмотриdeploymentInformer.Informer().HasSynced
. Когда я ранее объяснял рабочий процесс, я сказал, что компонент сначала выполнит список, а затем продолжит просмотр. Завершение действия списка зависит от каждого информатора.HasSynced
правда.
После этого регистрируем callback-функцию события для экземпляра информера:
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Deployment will always have different RVs.
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
Функции обратного вызова делятся на три категории: добавить, обновить и удалить. Каждое действие может зарегистрировать другую функцию обратного вызова. В этом примере регистрация одинакова. Если вы войдетеhandleObject()
, вы можете обнаружить, что все, что он делает, помещаетсяfoo
объект добавлен в контроллерworkqueue
середина. Но мы пока не будем рассматривать эту часть.
пройти черезNewController()
Мы получили объект контроллера, который содержит список, регистратор, набор клиентов. А информер мы инициализировали ранее, поэтому следующим шагом будет запуск контроллера и информера и пусть они работают вместе.
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
if err = controller.Run(2, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
Функция, которая запускает информер, - это каждый заводStart()
функция, реальная реализация этой функции находится в client-go/informers/factory.go,
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
Запускайте каждый информер по отдельности и отмечайте их статус как запущенные.Run()
Функция реализована в client-go/tools/cache/shared_informer.go вfunc (s *sharedIndexInformer) Run(stopCh <-chan struct{})
Эти примеры показывают, что индекс акций информатора, который является общим локальным кешем.Run()
Функция представляет еще один очень важный компонент: дельта-FIFO. Посмотрите на процесс создания:fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
. Есть два параметра, одинMetaNamespaceKeyFunc
, о котором несколько раз упоминалось выше, состоит в том, чтобы получить объектnamespace/name, другойs.indexer
, — это интерфейс для получения базового хранилища.
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
}
Более важное содержимое Deltafifo включаетitems
, queue
иknownObjects
, где предметыmap[string]Deltas{}
, где вводятся понятия delta и knownObjects. Дельта относится к изменению объекта, то есть к приращению ресурсов, что очень легко понять. По нашему интуитивному восприятию мы можем обработать событие, зачем нам knownObjects? Причина в том, что идет ресинхронизация, а в дельта-очереди FIFO много содержимого, если оно не обработано, если в это время выполняется ресинхронизация, дельта удаления некоторых объектов, которые были удалены в etcd будет безвозвратно потерян. На этом этапе knownObjects знает, какие объекты были удалены, и регенерирует дельту удаления, которая не была обработана вовремя. Не будем вдаваться в подробности известных объектов, посмотримDeltas
Детали. Дельты представляют собой набор дельт:type Deltas []Delta
, в то время как структура Дельты действие+контент
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{}
}
Type
может быть одним из четырех
- Added
- Updated
- Deleted
- Синхронизация: используется только в RESYNC
Соответствует точно три события, прежде чем он придет. Здесь есть вопрос: почему хранятся в Deltafifo Direct Depast Delta Deltas без него? Поскольку он может быть объединен, уменьшая количество событий обработки. Когда пользователь постоянно выполняет множество операций, прежде чем они будут объединены в DELTAS, уменьшают стресс компонента. Подробные разговоры позже о том, как добиться слияния дельты.
Теперь вернитесь кRun()
функция, мы далее вводимsharedIndexInformer
Последний компонент: контроллер. Этот контроллер является не сэмпл-контроллером, а контроллером в информере. После создания дельта-FIFO инициализируйте конфигурацию, требуемую контроллером.
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
Fifo передается прозрачно, и теперь он называется очередью; listerWatcher и objectType все прозрачно передаются; все вопросы, связанные с ресинхронизацией, связаны с процессором, который связан с механизмом list&watch; обратите вниманиеconfig.Process
назначить какs.HandleDeltas
Это обработчик дельты, и позже говорилось о том, как он будет работать в связке с threadSafeStore. После завершения инициализации настройте конфигурацию с новым контроллером, после запуска s.processor запустите s.controller. Отмечается, что в порядке отсрочки контроллер получает стоп-сигналstopCh
, который передается с верхнего уровня и используется сэмпл-контроллером, информером и т. д.; вновь созданныйprocessorStopCh
, при завершении программы гарантируется, что контроллер получает stopCh, останавливает контроллер и закрываетprocessorStopCh
, пока процессор проходитwg.StartWithChannel()
Гарантируется, что после закрытия канала процессор также остановится. Об этом говорят комментарии
Separate stop channel because Processor should be stopped strictly after controller
Итак, вопрос в том, что именно делает процессор?wg.StartWithChannel(processorStopCh, s.processor.run)
Запускается SharedProcessor, который сначала добавляет блокировку чтения-записи, блокирует чтение и присоединяется ко всем слушателям. Каждый слушатель является процессорным прослушивателем, а затем отдельноp.wg.Start(listener.run)
иp.wg.Start(listener.pop)
.字面上来说,sharedProcessor将事件分发给各个processor处理,然而但从这边我们还是不知道processor的功能。
Сначала отложим процессор, посмотримs.controller.Run()
что вы наделали.
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
Здесь представлена новая вещь, отражатель.NewReflector()
В него передаются четыре переменные: listWatcher, objectType, queue, resyncPeriod. Очередь на самом деле представляет собой дельта-FIFO.В сочетании с listWatcher и resyncPeriod мы можем легко догадаться, что рефлектор отвечает за взаимодействие с apiserver для получения изменений в ресурсах. Итак, мы можем продолжитьr.Run
иwait.Until(c.processLoop, time.Second, stopCh)
две функции. Получив сигнал stopCh, остановитьprocessLoop
Концевой отражатель. Первый взглядr.Run
, логика внутри очень простая, зацикливание навсегдаListAndWatch
, он перезапустится, даже если что-то неожиданно пойдет не так.
Механизм ListWatch не анализирует здесь исходный код, в принципе, он передаетlist, err = pager.List(context.Background(), options)
Каждый раз, когда список получает объект, он всегда будет отображать resourceVersion как 0 при выполнении этого списка. проходить послеlistMetaInterface.GetResourceVersion()
Получить resourceVersion и передатьsyncWith()
Замените номер версии в локальном кеше. После того, как список будет заполнен, будет запущена процедура go для периодической повторной синхронизации. После этого он войдет в бесконечный цикл и выполнит операцию наблюдения. И каждый раз после просмотра события запускатьr.watchHandler()
, который получит тип и содержимое события через механизм отражения и вызовет различные функции обратного вызова в зависимости от типа. В коде обновление контента будет своевременно обновляться до дельта-FIFO. То есть через механизм list&watch получаются изменения ресурсов. Следующий код может заметить, что шаблон Type+content точно такой же, какDelta
тип совпадает.
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
прочитай этоr.Run
,посмотриprocessLoop()
. Он занимает 1 с как цикл и выполняется периодически Логика очень проста, вызовc.config.Queue.Pop(PopProcessFunc(c.config.Process))
, то есть взять элемент из дельта-FIFO и обработать его. Вы можете перевернуть его, вышеупомянутый c.config.Processs.HandleDeltas
. Логика тут тоже очень простая, он взаимодействует с индексатором sharedIndexInformer, то есть кеша, обновляет содержимое, которое мы получаем из Delta FIFO, в нижележащий threadSafeMap, а затем передаетs.processor.distribute()
Распространите сообщение.
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
В дистрибутиве через sharedProcesser проходитlistener.add(obj)
Распространите этот объект на каждого слушателя. И функция снова выполняетсяp.addCh <- notification
. Увидев это, мы можем, наконец, совместить функции вышеописанного выпрямляющего процессора. Чтобы разобраться с идеями, контроллер (не семпл-контроллер) переупаковывает свой собственный модуль в качестве рефлектора во время инициализации, а рефлектор изменяет изменения ресурса на тип+объект через механизм list&watch.Delta
Поместите его в очередь Delta FIFO, и контроллер пройдетprocessLoop
Получите объект из дельта-очереди FIFO и обновите последний ресурс до threadSafeMa. На самом деле, грубо говоря, рефлектор — это часть контроллера, а контроллер — это часть информера, в итоге мы можем обнаружить, что информер получает события и синхронизирует ресурсы через list&watch, а простая очередь сообщений производится через дельта-FIFO.С одной стороны удобно реализовать режим действие+контент для удобства работыУдалитьТакое поведение, с одной стороны, очередь сообщений сжимает событие, что повышает эффективность обработки.
Теперь вернитесь и посмотрите на процессор. Когда дело доходит до вершины дельты FIFOpop()
и, наконец, инкапсулировать объект какnotifcation
, передать в доп. Напомним, что в функции Run() объекта sharedIndexInformer последний вызов выполняетсяwg.StartWithChannel(processorStopCh, s.processor.run)
, который, в свою очередь, вызывает всех слушателейp.wg.Start(listener.run)
иp.wg.Start(listener.pop)
, сначала посмотрите на функцию pop:
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
вышеприведенный абзацpop()
Очень красиво, несколько функций реализованы в одной функции. Когда addCh получает сигнал, то есть после взятия дельты из дельта-FIFO, определяется, пусто ли уведомление. Вообще говоря, уведомление всегда читается из pendingNoditication.Когда буфер пуст, уведомление пусто после обработки текущего события. В этом случае при появлении нового события снова устанавливается уведомление. Когда уведомление не пусто, содержимое уведомления передается в nextCh, а другое уведомление извлекается из буфера. Объяснять очень запутанно.Короче говоря, принцип круговой очереди, которая принимает события в дельта-FIFO через addCh, сохраняет эти события через уведомление и буфер, и отправляет их через nextCh.
Логика nextCh находится вrun()
, вы можете понять, взглянув на следующий код, в соответствии с разными типами событий выполняется различная обработка. Здесь p.handler — это обработчик событий, который мы видели в основной функции проекта.Мы ранее зарегистрировали функции обратного вызова с помощью eventHandler, и эти функции обратного вызова на самом деле вызываются здесь.
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
На данный момент мы проанализировали все механизмы информера. Что касается некоторых дополнительных деталей, таких как объединение событий в дельта-FIFO и т. д., то они не будут анализироваться из-за нехватки места. На самом деле упомянутое выше содержание очень простое с верхнего уровня, разработчикам даже не нужно четко понимать лежащий в основе принцип, они могут использовать механизм Информера для программирования и реализации простого образца-контроллера. Дизайн информера.Очень симпатичный.
использовать
С точки зрения неспециалиста, чтобы быстро приступить к программированию механизма информера, достаточно запомнить следующие шаги.
- Зависит от
clientcmd.BuildConfigFromFlags
Создать конфигурацию - передается по конфигурации
NewForConfig
генерировать набор клиентов - по набору клиентов
NewSharedInformerFactory
построить завод - Зарегистрируйте eventHandler с конкретным экземпляром Informer, который вы хотите
- перечислить
factory.Start()
На данный момент вы можете получить все данные в etcd через clientset! Так же есть небольшое замечание, клиентсет может общаться напрямую с аписервером, а должен общаться с аписервером напрямую через клиентсет во время операций записи, при операциях чтения пытаться получить данные из локального кеша через листер, а потому полученный data — это не копия, а ссылка, поэтому сначала сделайте глубокую копию, а потом уже обновляйте.