Недавно присоединился к клубу изучения исходного кода k8s, организованному сообществом облачных разработчиков, и начал изучать базовый исходный код k8s и систематизировать его в виде заметок. Заинтересованные студенты могут присоединиться и учиться вместе. В группе и сообществе есть всевозможные шишки, которые могут помочь вам ответить на ваши вопросы в любое время.GitHub.com/cloud родной…
Важность Informer в k8s повторяться не буду, а сразу перейду к теме.
Поместите сначала диаграмму вызовов
Поскольку исходный код этой части Информера относительно сложен, а ссылка вызова очень длинная, последующий анализ исходного кода будет сосредоточен на этой картинке.
Обзор
В k8s компоненты взаимодействуют через http.Не полагаясь на какое-либо промежуточное программное обеспечение, вам нужно обеспечить надежность, режим реального времени, последовательность и т. д. сообщений? Как это делает k8s? --- Ответ Информер. Остальные компоненты k8s общаются с api-сервером через информер.
Как работает Информер
К отдельным компонентам относятся:
- Рефлектор: используется для мониторинга (отслеживания) указанного ресурса и запуска соответствующего события изменения при изменении отслеживаемого ресурса. И сохранить объект ресурса в локальном кэше DeltaFIFO.
- DeltaFIFO: базовая операция постановки в очередь типа операции объекта ресурса.
- FIFO: очередь в порядке очереди, обеспечивающая такие операции, как добавление, удаление, изменение и проверка объектов ресурсов.
- Dealta: Хранилище объектов ресурсов, которое может сохранять тип операций объектов ресурсов. Например: добавить тип операции, обновить тип операции, удалить тип операции, тип операции синхронизации
- Индексатор: локальное хранилище, в котором хранятся объекты ресурсов и которое имеет собственную функцию индексирования.
- Reflect сохраняет потребляемый ресурсный объект из DeltaFIFO в индексатор.
- Данные в Indexer точно такие же, как и в Etcd, и client-go может читать их локально, уменьшая нагрузку на etcd и API-сервер.
Пример использования информера
- Создавайте объекты clientset через kubernetes.NewForConfig. информер должен взаимодействовать с apiserver через clientset
- Создайте канал для остановки, который используется для уведомления информера о выходе до завершения процесса. Потому что информатор — это постоянно работающая подпрограмма.
- informers.NewSharedInformerFactory создает экземпляры объектов sharedInformer
- Первый параметр — ClientSet.
- Второй параметр — как часто синхронизировать
- Метод Informer может получить объект-информер конкретного ресурса
- Функция AddEventHandler может добавлять методы обратного вызова к объектам и поддерживает три метода обратного вызова для объектов.
- AddFunc: метод обратного вызова срабатывается при создании объекта ресурса
- UpdateFunc: метод обратного вызова срабатывает при обновлении объекта ресурса.
- DeleteFunc: метод обратного вызова срабатывает при удалении объекта ресурса.
- Метод Run запускает текущий информер
// 通过informer机制,实现k8s资源的监控
func informer() {
// 因为informer是一个持久运行的groutine,channel作用:进程退出前通知informer退出
stopChan := make(chan struct{})
defer close(stopChan)
// 创建连接k8s的client对象
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
log.Printf("init clientset error.")
return
}
// 第一步:创建sharedInformer对象,第二个参数为重新同步数据的间隔时间
sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
// 第二步:每个资源都有informer对象,这里获取pod资源的informer对象
podInformer := sharedInformers.Core().V1().Pods().Informer()
// 第三步:添加自定义回调函数
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// 添加资源的回调函数,返回的是接口类型,需要强制转换为真正的类型
AddFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("New pod added: %s", mObj.GetName())
},
// 更新资源的回调函数
UpdateFunc: func(oldObj, newObj interface{}) {
oObj := oldObj.(v1.Object)
nObj := newObj.(v1.Object)
log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
},
// 删除资源的回调函数
DeleteFunc: func(obj interface{}) {
mObj := obj.(v1.Object)
log.Printf("pod deleted from store: %s", mObj.GetName())
},
})
// 第四步:开始运行informer对象
podInformer.Run(stopChan)
}
Информер ресурсов и SharedInformer
В предыдущей демонстрации первым шагом было создание объекта SharedInformer. Давайте сначала представим Informer и SharedInformer.
Информер ресурсов
- В каждом ресурсе реализован механизм Информера, позволяющий отслеживать различные события ресурса.
- Каждый Информер реализует методы Информера и Листера.
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}
SharedInformer
Если Informer одного и того же ресурса инстанцируется несколько раз, и каждый Informer использует Reflector, будет запущено слишком много одинаковых ListAndWatches, а слишком много повторяющихся операций сериализации и десериализации вызовут перегрузку api-сервера.
SharedInformer может сделать так, чтобы Informer ресурсов того же типа совместно использовал Reflector. Поле карты определено внутри для хранения всех полей Infromer.
Первым шагом в предыдущей демонстрации является создание SharedInformer,sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
, объект sharedInformerFactory инициализируется внутри, сначала посмотрите на sharedInformerFactory
Расположение исходного кода: vendor/k8s.io/client-go/informer/factory.go
type sharedInformerFactory struct {
client kubernetes.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
// 按照类型存放共享的informer
informers map[reflect.Type]cache.SharedIndexInformer
// 这个字段用来追踪informers是否被启动了
// 可以保证Start()方法安全的重复调用多次(幂等性)
startedInformers map[reflect.Type]bool
}
Метод запуска
Компонент Controller-Manager в k8s, метод Run в исходном коде вызывает метод Start SharedInformerFactory
Расположение исходного кода: cmd/kube-controller-manager/app/controllermanager.go
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
...
controllerContext.InformerFactory.Start(controllerContext.Stop)
...
}
Расположение исходного кода: k8s.io/client-go/informers/factory.go
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
...
// 遍历所有的informers
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
// 每一种informer启动一个协程,运行Run方法
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
Получить Информер
В предыдущей демонстрации после создания объекта sharedInformer вторым шагом является вызовpodInformer := sharedInformers.Core().V1().Pods().Informer()
, получить конкретный экземпляр Informer, приступим к анализу метода Informer
Ключевая логика включает в себя:
- инициализация общего процессора
- Регистрация методов списка и наблюдения: регистрация методов списка и наблюдения для определенного типа ресурса.
- Инициализация индексатора: класс реализации — это класс кеша
Возьмем, к примеру, pod, расположение исходного кода: client-go/informers/core/v1/pod.go.
// 获取pod的informer,内部调用InformerFor,参数需要传入f.defaultInformer
func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
// 最后一个参数,初始化indexers
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
// 注册List、Watch方法
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// List方法是该种资源对象的List方法(这里是pod)
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
// Watch方法是该种资源对象的Watch方法(这里是pod)
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
// 这里是processor的初始化
processor: &sharedProcessor{clock: realClock},
// 这里是Indexer的初始化,接口为Indexer,实现类为cache
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: exampleObject,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
clock: realClock,
}
return sharedIndexInformer
}
// Index接口,实现类是cache类
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}
Зарегистрируйте пользовательскую функцию обратного вызова
После получения объекта Informer третьим шагом является регистрация пользовательской callback-функции для информера.Когда ресурс k8s отправляет изменения, он может реализовывать собственную бизнес-логику.
Давайте проанализируемpodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{...})
логика
// 开始注册事件处理函数
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
...
// 每一个监听者,都会注册为一个listner实例
// 每个listener中持有一个handler对象,后面事件发生时,框架会调用handler方法,也就走到用户注册的代码逻辑了
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
...
// 将listner添加到sharedProcessor中
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
listener.add(addNotification{newObj: item})
}
}
// 将listner添加到sharedProcessor中
func (p *sharedProcessor) addListener(listener *processorListener) {
...
if p.listenersStarted {
// listener后台启动了两个协程,这两个协程很关键,后面会介绍
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
Метод запуска
В предыдущей демонстрации после получения экземпляра информера последним шагом был вызов метода Run, приступим к анализу логики метода Run. Основная логика включает в себя:
- Инициализация DeltaFIFO
- Инициализация контроллера
- Запустите метод process.Run
- Запустите метод controller.Run
Расположение исходного кода: k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
// 初始化DeltaFIFO
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})
// config初始化
// 这里重点关注ListerWatcher对象和Process对象,Process关联的是HandleDeltas函数
// HandleDeltas是消费增量信息(Delta对象)的核心方法
cfg := &Config{
Queue: fifo,
// ListAndWatch对象
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
// 注册回调函数 HandleDeltas,资源变更时,存到到本地Indexer
Process: s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}
// 这里主要是controller的初始化
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 初始化Controller对象
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// s.cacheMutationDetector.Run检查缓存对象是否存在
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 执行sharedProcessor.run方法
// 这个方法非常重要
wg.StartWithChannel(processorStopCh, s.processor.run)
...
// 调用Controller的Run方法
s.controller.Run(stopCh)
}
Метод Run процесса
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
...
// sharedProcessor的所有Listner,每个后台启动两个协程
// 分别指向run和pop方法
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
...
}
Метод запуска обработчикаListener
ПроцессорListener представляет собой объект-потребитель.Эта функция выполняется периодически, в основном для получения инкрементной информации, полученной от api-сервера из его собственного канала nextCh, а затем вызывает соответствующие методы обработчика.Метод-обработчик — это метод, передаваемый пользовательский метод.
Здесь нам просто нужно знать, что метод run — это метод-потребитель, отвечающий за потребление событий. Позже мы представим, кто является производителем и кто помещает инкрементную информацию в канал nextCh процессорного списка (фактически метод pop ниже)
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
// 消费者方法,不断从通道中获取事件
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
// 调用handler的方法,
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}
// OnUpdate方法,内部就是调用demo中注册的UpdateFunc方法
// 其他方法类似
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
Поп-метод процессораListener
Реализация этого метода очень сложна, но общая цель очень проста: это создание сообщений в nextCh, а затем представленный ранее метод run может использовать их.
Здесь в основном используется буферный метод вместо получения одного события за раз.
func (p *processorListener) pop() {
...
var nextCh chan<- interface{}
var notification interface{}
for {
select {
// 函数第一次进来,这个notification一定是空的,这个case会被阻塞住
// 当第二个case调用完成后,notification被赋值为notificationToAdd,才会进入到这里
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 第一次调用时,会进入这里,首先从addCh中获取数据(后面会介绍谁往addCh中放数据)
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
notification = notificationToAdd
// channel是引用类型,将p.nextCh指向nextCh,对nextCh的操作就是操作p.nextCh
// 这里也解答了前面的run方法里面提到的疑问,谁是nextCh的生产者,往nextCh放入数据
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
В функции pop мы видим, что основной целью является потребление p.addCh, и позже мы подробно представим производителя p.addCh. Вот краткое упоминание. Когда функция наблюдения отслеживает изменение события API-сервера, она запускает функцию HandlerDelta. Эта функция обновит индексатор и вызовет метод распределения, чтобы уведомить всех слушателей о событии. Внутренняя реализация: каждому слушателю addCh помещает данные в этот канал.
Метод запуска контроллера
Ключевая логика внутри метода Run включает в себя:
- Инициализировать объект Reflector
- Вызовите метод Run рефлектора
- Список вызовов для получения всех данных о ресурсах
- Вызов Watch для отслеживания изменений ресурсов в режиме реального времени и постановка их в очередь
- Вызвать метод processLoop контроллера
- Использование данных в очереди
Расположение исходного кода: k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
// 调用NewReflector初始化一个Reflector
// 必须传入ListWatcher数据接口对象
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
// 调用Reflector的Run方法,启动监控,并处理监控事件
wg.StartWithChannel(stopCh, r.Run)
// processLoop负责从DeltaFIFO取出数据并消费
wait.Until(c.processLoop, time.Second, stopCh)
}
Reflector
Здесь сначала приостанавливается анализ исходного кода, а краткое введение в Reflector используется для мониторинга указанных ресурсов k8s и запуска соответствующих событий изменения.
- NewReflector: чтобы создать объект Reflector, вам необходимо передать объект интерфейса данных ListerWatcher.
- Выполнить: запустить мониторинг и обработку событий
Основная функция в прогоне представляет собой listandwatch, в том числе:
- Получить данные списка ресурсов
- Мониторинг объектов ресурсов: групповое кодирование передачи с использованием протокола http
Класс рефлектора
type Reflector struct {
// 名称
name string
expectedTypeName string
// 期望放入缓存store的资源类型
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// 存放同步监听到的资源,这里是DeltaFIFO类
store Store
// 用来执行List和Watch的对象
listerWatcher ListerWatcher
backoffManager wait.BackoffManager
// resync周期
resyncPeriod time.Duration
ShouldResync func() bool
clock clock.Clock
paginatedResult bool
// 最新一次看到的资源版本号
lastSyncResourceVersion string
isLastSyncResourceVersionUnavailable bool
lastSyncResourceVersionMutex sync.RWMutex
WatchListPageSize int64
watchErrorHandler WatchErrorHandler
}
Основной метод: запустить
Основная логика включает в себя:
- Вызовите метод List, чтобы получить все данные в объекте ресурса.
- Преобразование данных ресурсов в список объектов ресурсов
- Сохраняйте информацию о ресурсах в DeltaFIFO и полностью заменяйте локальный кеш
- Вызовите метод Watch для мониторинга ресурсов
- Вызовите функцию watchHandler для обработки различных событий с часов.
// Run函数
func (r *Reflector) Run(stopCh <-chan struct{}) {
...
wait.BackoffUntil(func() {
// 核心函数:ListAndWatch
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
...
}
Метод ListAndWatch
// ListAndWatch函数
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...
if err := func() error {
...
go func() {
...
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
// 调用List方法获取资源对象下所有的数据
return r.listerWatcher.List(opts)
}))
...
}()
...
// 获取资源版本号
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 将资源数据转换为资源对象列表
items, err := meta.ExtractList(list)
// 将资源信息存储到DeltaFIFO中,全量替换本地缓存
// 内部调用了replace方法
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
// 设置最新的资源版本号
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
go func() {
...
for {
...
// 同步资源
if r.ShouldResync == nil || r.ShouldResync() {
// 调用DeltaFIFO的Resync方法
if err := r.store.Resync(); err != nil {
...
}
}
resyncCh, cleanup = r.resyncChan()
}
}()
for {
...
// 监听资源
w, err := r.listerWatcher.Watch(options)
// 处理handler事件,用户注册的Add,Delete,Update函数
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
...
}
}
// syncWith函数
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
// 调用cache.Replace
return r.store.Replace(found, resourceVersion)
}
// cache.Replace
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
items := make(map[string]interface{}, len(list))
for _, item := range list {
key, err := c.keyFunc(item)
if err != nil {
return KeyError{item, err}
}
items[key] = item
}
c.cacheStorage.Replace(items, resourceVersion)
return nil
}
метод watchHandler
Функция watchHandler обрабатывает различные события от часов, все события сохраняются в ResultChan, в том числе: тип события, объект ресурса
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
...
for {
select {
...
// 获取watch接口中的事件的channel
case event, ok := <-w.ResultChan():
...
switch event.Type {
// 处理Add函数
case watch.Added:
// store是DeltaFIFO类
err := r.store.Add(event.Object)
// 处理Modified函数
case watch.Modified:
err := r.store.Update(event.Object)
// 处理Deleted函数
case watch.Deleted:
err := r.store.Delete(event.Object)
}
*resourceVersion = newResourceVersion
// 设置资源版本
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
...
}
DeltaFIFO
Следующий анализ исходного кода будет использовать DeltaFIFO, который будет представлен здесь первым.
DeltaFIFO используется для хранения различных событий, возвращаемых Watch API, в очереди будет один и тот же ресурсный объект с разными типами операций. DeltaFIFO реализует интерфейс Queue, а Queue наследует интерфейс Store.
Исходный путь: vendor/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
// map结构存储:key是资源对象的key,value是对象的Deltas数组
items map[string]Deltas
// 存储资源对象的key
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// Index本地存储对象
knownObjects KeyListerGetter
closed bool
closedLock sync.Mutex
}
Основные функции включают в себя:
- метод производителя
- потребительский метод
- Механизм ресинхронизации
метод производителя
После того, как Reflector отслеживает изменения ресурсов, он добавляет информацию об изменении ресурсов, такую как «Добавить», «Удалить» и «Обновить», в DeltaFIFO. То есть производитель очереди, метод выглядит следующим образом, которые вызываются внутренне
Функция входа — r.store.Add(event.Object), которая была представлена в предыдущем watchHandler.
func (f *DeltaFIFO) Add(obj interface{}) error {
...
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error {
...
return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
...
return f.queueActionLocked(Deleted, obj)
}
// 举例说明其中一个处理函数,其他的类似,内部都调用queueActionLocked
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
// 内部调用queueActionLocked
return f.queueActionLocked(Updated, obj)
}
метод queueActionLocked
// 内部主要是封装Delta事件,并加入队列,供消费者消费(HandleDeltas函数)
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 根据资源对象,得到key,一般是 namespace/name 格式
id, err := f.KeyOf(obj)
// 将watch到的事件类型和资源对象,封装成Delta对象
newDeltas := append(f.items[id], Delta{actionType, obj})
// 去重操作
newDeltas = dedupDeltas(newDeltas)
// 将Delta对象加入到队列中
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
delete(f.items, id)
}
return nil
}
Потребительский метод — processLoop
Предыдущий анализ исходного кода, после анализа метода Run рефлектора, следующим шагом является метод processLoop контроллера.
func (c *controller) Run(stopCh <-chan struct{}) {
// 调用NewReflector初始化一个Reflector
// 必须传入ListWatcher数据接口对象
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
// 调用Reflector的Run方法,启动监控,并处理监控事件
wg.StartWithChannel(stopCh, r.Run)
// processLoop负责从DeltaFIFO取出数据并消费
wait.Until(c.processLoop, time.Second, stopCh)
}
func (c *controller) processLoop() {
for {
// 从DeltaFIFO队列中取出数据,并交给process处理
// process函数保存在config.Process中,也就是前面传入的 HandleDeltas
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
...
}
}
Поп-метод
Метод потребления DeltaFIFO — Pop.Эту функцию необходимо передать в функцию процесса, которая используется для получения и обработки метода обратного вызова объекта.
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
for {
for len(f.queue) == 0 {
// 当队列为空时,Pop函数阻塞住,知道新的数据入队列才唤醒
// 如果Close函数被调用,closed状态被设置,并且广播
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 走到这里,说明队列中有数据,取出数据
id := f.queue[0]
...
// 将数据交给上层回调函数处理
err := process(item)
// 出错则将数据重新放入队列
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
функция процесса: HandleDeltas
- Функция обратного вызова HandleDeltas передается в методе Run.
- Поп-объект, выполняемый внутри processLoop, представляет собой HandleDeltas, переданный выше.
Основная логика включает в себя:
- Обновление локального кэша cacheStorage фактически обновляет структуру данных threadSafeMap.
- Уведомление о событии всем слушателям фактически помещает данные в addCh слушателя для потребления потребителем.
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// 获取所有的Delta资源
for _, d := range obj.(Deltas) {
// 判断资源类型
switch d.Type {
// 如果是下列类型,将资源存储到Indexer
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// indexer的实现类是前面介绍过的cache
if err := s.indexer.Update(d.Object); err != nil {
return err
}
isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 将资源对象分发至 SharedInformer 的事件处理函数中
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
// 更新本地缓存cacheStorage
// 其实就是更新 threadSafeMap 这个数据结构,threadSafeMap的初始化在前面介绍过
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}
// distribute函数
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
...
if sync {
for _, listener := range p.syncingListeners {
// 核心方法
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
// listener.add方法,这里将事件添加到listener的addCh通道中
// 至此,也回答的前面的问题-- 谁往p.addCh中生产数据
func (p *processorListener) add(notification interface{}) {
// 不同更新类型的对象加入到channel中
// 供给processorListener的Run方法使用
p.addCh <- notification
}
Механизм ресинхронизации
Три шага в методе ListAndWatch:
- List
- Rsync
- Watch
RSync отвечает за синхронизацию объектов ресурсов, хранимых локально индексатором, с DeltaFIFO и за установку типа ресурса на тип синхронизации. Запланированное выполнение в Reflector
func (f *DeltaFIFO) Resync() error {
// 获取indexer本地存储对象
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
}
}
return nil
}
Indexer
Как упоминалось в предыдущем анализе, изменения ресурсов будут сохраняться в локальном индексаторе, который будет представлен здесь.
- Индексатор — это локальное хранилище, используемое client-go для хранения объектов ресурсов и имеющее собственную функцию индексирования.
- Объекты ресурсов, потребляемые Reflector из DeltaFIFO, хранятся в индексаторе.
- Данные индексатора согласуются с Etcd, client-go может легко считывать данные с локального сервера, что снижает нагрузку на API-сервер.
Четыре важные структуры данных
Расположение исходного кода: k8s.io/client-go/tools/cache/index.go
// 存储缓存数据
// type Empty struct{}
// type String map[string]Empty
// 这里的sets.String是用map模拟set,map中的value都是空结构体
type Index map[string]sets.String
// 索引器函数,接收资源对象,返回检索结果列表
type IndexFunc func(obj interface{}) ([]string, error)
// 存储索引器,key为索引器名称,value为索引器实现函数
type Indexers map[string]IndexFunc
// 存储缓存器,key为缓存器名称,value为缓存数据
type Indices map[string]Index
ThreadSafeMap
Индексатор инкапсулирован на основе ThreadSafeMap, давайте сначала взглянем на ThreadSafeMap.
- ThreadSafeMap — это хранилище в памяти, данные не хранятся на диске.
- Добавление, удаление, изменение и проверка будут заблокированы для обеспечения согласованности данных.
- Индексатор и буфер используются внутри
Расположение исходного кода: k8s.io/tools/cache/thread_safe_store.go
type threadSafeMap struct {
lock sync.RWMutex
// map结构存储资源数据
// map中的key是通过keyFunc函数计算得出,默认使用MetaNamespaceFunc函数
// 该函数根据资源对象计算出<namespace>/<name>格式的key
// value是Delta对象,包括Type和Object资源对象
items map[string]interface{}
// 存储索引器,key为索引器名称,value为索引器实现函数
indexers Indexers
// 存储缓存器,key为缓存器名称,value为缓存的资源对象数据
indices Indices
}
// 通过执行索引器函数得到索引结果
// 需要两个参数:索引器名称、需要检索的key
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
// 查找指定的索引器函数
indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
// 查找指定的缓存器函数
index := c.indices[indexName]
// 从缓存数据中查找并返回数据
set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
Суммировать
Механизм Informer играет важную роль в k8s, и его исходный код также очень сложен. В процессе обучения необходимо сопоставить картинку в начале статьи, иначе будет легко обойти. Он использует Queue и Channel для разделения различных компонентов. Личный опыт таков: вокруг основной идеи логика может быть понятнее при анализе, то есть: кто помещает данные в канал, кто получает данные из канала.