Анализ исходного кода k8s - Механизм информера

Kubernetes

Недавно присоединился к клубу изучения исходного кода k8s, организованному сообществом облачных разработчиков, и начал изучать базовый исходный код k8s и систематизировать его в виде заметок. Заинтересованные студенты могут присоединиться и учиться вместе. В группе и сообществе есть всевозможные шишки, которые могут помочь вам ответить на ваши вопросы в любое время.GitHub.com/cloud родной…

Важность Informer в k8s повторяться не буду, а сразу перейду к теме.

Поместите сначала диаграмму вызовов

HD-адрес

Поскольку исходный код этой части Информера относительно сложен, а ссылка вызова очень длинная, последующий анализ исходного кода будет сосредоточен на этой картинке.

Informer调用关系图

Обзор

В k8s компоненты взаимодействуют через http.Не полагаясь на какое-либо промежуточное программное обеспечение, вам нужно обеспечить надежность, режим реального времени, последовательность и т. д. сообщений? Как это делает k8s? --- Ответ Информер. Остальные компоненты k8s общаются с api-сервером через информер.

Как работает Информер

k8s-informer

К отдельным компонентам относятся:

  • Рефлектор: используется для мониторинга (отслеживания) указанного ресурса и запуска соответствующего события изменения при изменении отслеживаемого ресурса. И сохранить объект ресурса в локальном кэше DeltaFIFO.
  • DeltaFIFO: базовая операция постановки в очередь типа операции объекта ресурса.
    • FIFO: очередь в порядке очереди, обеспечивающая такие операции, как добавление, удаление, изменение и проверка объектов ресурсов.
    • Dealta: Хранилище объектов ресурсов, которое может сохранять тип операций объектов ресурсов. Например: добавить тип операции, обновить тип операции, удалить тип операции, тип операции синхронизации
  • Индексатор: локальное хранилище, в котором хранятся объекты ресурсов и которое имеет собственную функцию индексирования.
    • Reflect сохраняет потребляемый ресурсный объект из DeltaFIFO в индексатор.
    • Данные в Indexer точно такие же, как и в Etcd, и client-go может читать их локально, уменьшая нагрузку на etcd и API-сервер.

Пример использования информера

  • Создавайте объекты clientset через kubernetes.NewForConfig. информер должен взаимодействовать с apiserver через clientset
  • Создайте канал для остановки, который используется для уведомления информера о выходе до завершения процесса. Потому что информатор — это постоянно работающая подпрограмма.
  • informers.NewSharedInformerFactory создает экземпляры объектов sharedInformer
    • Первый параметр — ClientSet.
    • Второй параметр — как часто синхронизировать
  • Метод Informer может получить объект-информер конкретного ресурса
  • Функция AddEventHandler может добавлять методы обратного вызова к объектам и поддерживает три метода обратного вызова для объектов.
    • AddFunc: метод обратного вызова срабатывается при создании объекта ресурса
    • UpdateFunc: метод обратного вызова срабатывает при обновлении объекта ресурса.
    • DeleteFunc: метод обратного вызова срабатывает при удалении объекта ресурса.
  • Метод Run запускает текущий информер
// 通过informer机制,实现k8s资源的监控
func informer() {
  // 因为informer是一个持久运行的groutine,channel作用:进程退出前通知informer退出
  stopChan := make(chan struct{})
  defer close(stopChan)

  // 创建连接k8s的client对象
  clientSet, err := kubernetes.NewForConfig(config)
  if err != nil {
    log.Printf("init clientset error.")
    return
  }

  // 第一步:创建sharedInformer对象,第二个参数为重新同步数据的间隔时间
  sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute)
  // 第二步:每个资源都有informer对象,这里获取pod资源的informer对象
  podInformer := sharedInformers.Core().V1().Pods().Informer()
  // 第三步:添加自定义回调函数
  podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    // 添加资源的回调函数,返回的是接口类型,需要强制转换为真正的类型
    AddFunc: func(obj interface{}) {
      mObj := obj.(v1.Object)
      log.Printf("New pod added: %s", mObj.GetName())
    },
    // 更新资源的回调函数
    UpdateFunc: func(oldObj, newObj interface{}) {
      oObj := oldObj.(v1.Object)
      nObj := newObj.(v1.Object)
      log.Printf("%s pod updated to %s", oObj.GetName(), nObj.GetName())
    },
    // 删除资源的回调函数
    DeleteFunc: func(obj interface{}) {
      mObj := obj.(v1.Object)
      log.Printf("pod deleted from store: %s", mObj.GetName())
    },
  })
  // 第四步:开始运行informer对象
  podInformer.Run(stopChan)
}

informer数据流转图

Информер ресурсов и SharedInformer

В предыдущей демонстрации первым шагом было создание объекта SharedInformer. Давайте сначала представим Informer и SharedInformer.

Информер ресурсов

  • В каждом ресурсе реализован механизм Информера, позволяющий отслеживать различные события ресурса.
  • Каждый Информер реализует методы Информера и Листера.
type PodInformer interface {
  Informer() cache.SharedIndexInformer
  Lister() v1.PodLister
}

SharedInformer

Если Informer одного и того же ресурса инстанцируется несколько раз, и каждый Informer использует Reflector, будет запущено слишком много одинаковых ListAndWatches, а слишком много повторяющихся операций сериализации и десериализации вызовут перегрузку api-сервера.

SharedInformer может сделать так, чтобы Informer ресурсов того же типа совместно использовал Reflector. Поле карты определено внутри для хранения всех полей Infromer.

Первым шагом в предыдущей демонстрации является создание SharedInformer,sharedInformers := informers.NewSharedInformerFactory(clientSet, time.Minute), объект sharedInformerFactory инициализируется внутри, сначала посмотрите на sharedInformerFactory

Расположение исходного кода: vendor/k8s.io/client-go/informer/factory.go

type sharedInformerFactory struct {
  client           kubernetes.Interface
  namespace        string
  tweakListOptions internalinterfaces.TweakListOptionsFunc
  lock             sync.Mutex
  defaultResync    time.Duration
  customResync     map[reflect.Type]time.Duration

  // 按照类型存放共享的informer
  informers map[reflect.Type]cache.SharedIndexInformer

  // 这个字段用来追踪informers是否被启动了
  // 可以保证Start()方法安全的重复调用多次(幂等性)
  startedInformers map[reflect.Type]bool
}

Метод запуска

Компонент Controller-Manager в k8s, метод Run в исходном коде вызывает метод Start SharedInformerFactory

Расположение исходного кода: cmd/kube-controller-manager/app/controllermanager.go

func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
  ...
  controllerContext.InformerFactory.Start(controllerContext.Stop)
  ...
}

Расположение исходного кода: k8s.io/client-go/informers/factory.go

func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
  ...
  // 遍历所有的informers
  for informerType, informer := range f.informers {
    if !f.startedInformers[informerType] {
      // 每一种informer启动一个协程,运行Run方法
      go informer.Run(stopCh)
      f.startedInformers[informerType] = true
    }
  }
}

Получить Информер

В предыдущей демонстрации после создания объекта sharedInformer вторым шагом является вызовpodInformer := sharedInformers.Core().V1().Pods().Informer(), получить конкретный экземпляр Informer, приступим к анализу метода Informer

Ключевая логика включает в себя:

  • инициализация общего процессора
  • Регистрация методов списка и наблюдения: регистрация методов списка и наблюдения для определенного типа ресурса.
  • Инициализация индексатора: класс реализации — это класс кеша

Возьмем, к примеру, pod, расположение исходного кода: client-go/informers/core/v1/pod.go.

// 获取pod的informer,内部调用InformerFor,参数需要传入f.defaultInformer
func (f *podInformer) Informer() cache.SharedIndexInformer {
  return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
  // 最后一个参数,初始化indexers
  return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
  return cache.NewSharedIndexInformer(
    // 注册List、Watch方法
    &cache.ListWatch{
      ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
        if tweakListOptions != nil {
          tweakListOptions(&options)
        }
        // List方法是该种资源对象的List方法(这里是pod)
        return client.CoreV1().Pods(namespace).List(context.TODO(), options)
      },
      WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
        if tweakListOptions != nil {
          tweakListOptions(&options)
        }
        // Watch方法是该种资源对象的Watch方法(这里是pod)
        return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
      },
    },
    &corev1.Pod{},
    resyncPeriod,
    indexers,
  )
}

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
  realClock := &clock.RealClock{}
  sharedIndexInformer := &sharedIndexInformer{
    // 这里是processor的初始化
    processor:                       &sharedProcessor{clock: realClock},
    // 这里是Indexer的初始化,接口为Indexer,实现类为cache
    indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
    listerWatcher:                   lw,
    objectType:                      exampleObject,
    resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
    defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
    cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
    clock:                           realClock,
  }
  return sharedIndexInformer
}

// Index接口,实现类是cache类
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
  return &cache{
    cacheStorage: NewThreadSafeStore(indexers, Indices{}),
    keyFunc:      keyFunc,
  }
}

Зарегистрируйте пользовательскую функцию обратного вызова

После получения объекта Informer третьим шагом является регистрация пользовательской callback-функции для информера.Когда ресурс k8s отправляет изменения, он может реализовывать собственную бизнес-логику. Давайте проанализируемpodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{...})логика

// 开始注册事件处理函数
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
  s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
  ...
  // 每一个监听者,都会注册为一个listner实例
  // 每个listener中持有一个handler对象,后面事件发生时,框架会调用handler方法,也就走到用户注册的代码逻辑了
  listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
  ...
  // 将listner添加到sharedProcessor中
  s.processor.addListener(listener)
  for _, item := range s.indexer.List() {
    listener.add(addNotification{newObj: item})
  }
}

// 将listner添加到sharedProcessor中
func (p *sharedProcessor) addListener(listener *processorListener) {
  ...
  if p.listenersStarted {
    // listener后台启动了两个协程,这两个协程很关键,后面会介绍
    p.wg.Start(listener.run)
    p.wg.Start(listener.pop)
  }
}

Метод запуска

В предыдущей демонстрации после получения экземпляра информера последним шагом был вызов метода Run, приступим к анализу логики метода Run. Основная логика включает в себя:

  • Инициализация DeltaFIFO
  • Инициализация контроллера
  • Запустите метод process.Run
  • Запустите метод controller.Run

Расположение исходного кода: k8s.io/client-go/tools/cache/shared_informer.go

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  // 初始化DeltaFIFO
  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,
  })
  // config初始化
  // 这里重点关注ListerWatcher对象和Process对象,Process关联的是HandleDeltas函数
  // HandleDeltas是消费增量信息(Delta对象)的核心方法
  cfg := &Config{
    Queue:            fifo,
    // ListAndWatch对象
    ListerWatcher:    s.listerWatcher,
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,
    // 注册回调函数 HandleDeltas,资源变更时,存到到本地Indexer
    Process:           s.HandleDeltas,
    WatchErrorHandler: s.watchErrorHandler,
  }
  // 这里主要是controller的初始化
  func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
    // 初始化Controller对象
    s.controller = New(cfg)
    s.controller.(*controller).clock = s.clock
    s.started = true
  }()
  // s.cacheMutationDetector.Run检查缓存对象是否存在
  wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  // 执行sharedProcessor.run方法
  // 这个方法非常重要
  wg.StartWithChannel(processorStopCh, s.processor.run)
  ...
  // 调用Controller的Run方法
  s.controller.Run(stopCh)
}

Метод Run процесса

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  func() {
    ...
    // sharedProcessor的所有Listner,每个后台启动两个协程
    // 分别指向run和pop方法
    for _, listener := range p.listeners {
      p.wg.Start(listener.run)
      p.wg.Start(listener.pop)
    }
    p.listenersStarted = true
  }()
  ...
}

Метод запуска обработчикаListener

ПроцессорListener представляет собой объект-потребитель.Эта функция выполняется периодически, в основном для получения инкрементной информации, полученной от api-сервера из его собственного канала nextCh, а затем вызывает соответствующие методы обработчика.Метод-обработчик — это метод, передаваемый пользовательский метод.

Здесь нам просто нужно знать, что метод run — это метод-потребитель, отвечающий за потребление событий. Позже мы представим, кто является производителем и кто помещает инкрементную информацию в канал nextCh процессорного списка (фактически метод pop ниже)

func (p *processorListener) run() {

  stopCh := make(chan struct{})
  wait.Until(func() {
    // 消费者方法,不断从通道中获取事件
    for next := range p.nextCh {
      switch notification := next.(type) {
      case updateNotification:
        // 调用handler的方法,
        p.handler.OnUpdate(notification.oldObj, notification.newObj)
      case addNotification:
        p.handler.OnAdd(notification.newObj)
      case deleteNotification:
        p.handler.OnDelete(notification.oldObj)
      default:
        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
      }
    }
    // the only way to get here is if the p.nextCh is empty and closed
    close(stopCh)
  }, 1*time.Second, stopCh)
}

// OnUpdate方法,内部就是调用demo中注册的UpdateFunc方法
// 其他方法类似
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
  if r.UpdateFunc != nil {
    r.UpdateFunc(oldObj, newObj)
  }
}

Поп-метод процессораListener

Реализация этого метода очень сложна, но общая цель очень проста: это создание сообщений в nextCh, а затем представленный ранее метод run может использовать их.

Здесь в основном используется буферный метод вместо получения одного события за раз.

func (p *processorListener) pop() {
  ...
  var nextCh chan<- interface{}
  var notification interface{}
  for {
    select {
    // 函数第一次进来,这个notification一定是空的,这个case会被阻塞住
    // 当第二个case调用完成后,notification被赋值为notificationToAdd,才会进入到这里
    case nextCh <- notification:
      // Notification dispatched
      var ok bool
      notification, ok = p.pendingNotifications.ReadOne()
      if !ok { // Nothing to pop
        nextCh = nil // Disable this select case
      }
    // 第一次调用时,会进入这里,首先从addCh中获取数据(后面会介绍谁往addCh中放数据)
    case notificationToAdd, ok := <-p.addCh:
      if !ok {
        return
      }
      if notification == nil { // No notification to pop (and pendingNotifications is empty)
        notification = notificationToAdd
        // channel是引用类型,将p.nextCh指向nextCh,对nextCh的操作就是操作p.nextCh
        // 这里也解答了前面的run方法里面提到的疑问,谁是nextCh的生产者,往nextCh放入数据
        nextCh = p.nextCh
      } else { // There is already a notification waiting to be dispatched
        p.pendingNotifications.WriteOne(notificationToAdd)
      }
    }
  }
}

В функции pop мы видим, что основной целью является потребление p.addCh, и позже мы подробно представим производителя p.addCh. Вот краткое упоминание. Когда функция наблюдения отслеживает изменение события API-сервера, она запускает функцию HandlerDelta. Эта функция обновит индексатор и вызовет метод распределения, чтобы уведомить всех слушателей о событии. Внутренняя реализация: каждому слушателю addCh помещает данные в этот канал.

Метод запуска контроллера

Ключевая логика внутри метода Run включает в себя:

  • Инициализировать объект Reflector
  • Вызовите метод Run рефлектора
    • Список вызовов для получения всех данных о ресурсах
    • Вызов Watch для отслеживания изменений ресурсов в режиме реального времени и постановка их в очередь
  • Вызвать метод processLoop контроллера
    • Использование данных в очереди

Расположение исходного кода: k8s.io/client-go/tools/cache/controller.go

func (c *controller) Run(stopCh <-chan struct{}) {
  // 调用NewReflector初始化一个Reflector
  // 必须传入ListWatcher数据接口对象
  r := NewReflector(
    c.config.ListerWatcher,
    c.config.ObjectType,
    c.config.Queue,
    c.config.FullResyncPeriod,
  )
  ...
  // 调用Reflector的Run方法,启动监控,并处理监控事件
  wg.StartWithChannel(stopCh, r.Run)

  // processLoop负责从DeltaFIFO取出数据并消费
  wait.Until(c.processLoop, time.Second, stopCh)
}

Reflector

Здесь сначала приостанавливается анализ исходного кода, а краткое введение в Reflector используется для мониторинга указанных ресурсов k8s и запуска соответствующих событий изменения.

  • NewReflector: чтобы создать объект Reflector, вам необходимо передать объект интерфейса данных ListerWatcher.
  • Выполнить: запустить мониторинг и обработку событий

Основная функция в прогоне представляет собой listandwatch, в том числе:

  • Получить данные списка ресурсов
  • Мониторинг объектов ресурсов: групповое кодирование передачи с использованием протокола http

Класс рефлектора

type Reflector struct {
  // 名称
  name string
  expectedTypeName string
  // 期望放入缓存store的资源类型
  expectedType reflect.Type
  // The GVK of the object we expect to place in the store if unstructured.
  expectedGVK *schema.GroupVersionKind
  // 存放同步监听到的资源,这里是DeltaFIFO类
  store Store
  // 用来执行List和Watch的对象
  listerWatcher ListerWatcher
  backoffManager wait.BackoffManager
  // resync周期
  resyncPeriod time.Duration
  ShouldResync func() bool
  clock clock.Clock
  paginatedResult bool
  // 最新一次看到的资源版本号
  lastSyncResourceVersion string
  isLastSyncResourceVersionUnavailable bool
  lastSyncResourceVersionMutex sync.RWMutex
  WatchListPageSize int64
  watchErrorHandler WatchErrorHandler
}

Основной метод: запустить

Основная логика включает в себя:

  • Вызовите метод List, чтобы получить все данные в объекте ресурса.
  • Преобразование данных ресурсов в список объектов ресурсов
  • Сохраняйте информацию о ресурсах в DeltaFIFO и полностью заменяйте локальный кеш
  • Вызовите метод Watch для мониторинга ресурсов
  • Вызовите функцию watchHandler для обработки различных событий с часов.
// Run函数
func (r *Reflector) Run(stopCh <-chan struct{}) {
  ...
  wait.BackoffUntil(func() {
    // 核心函数:ListAndWatch
    if err := r.ListAndWatch(stopCh); err != nil {
      r.watchErrorHandler(r, err)
    }
  }, r.backoffManager, true, stopCh)
  ...
}

Метод ListAndWatch

// ListAndWatch函数
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
  ...
  if err := func() error {
    ...
    go func() {
      ...
      pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
        // 调用List方法获取资源对象下所有的数据
        return r.listerWatcher.List(opts)
      }))
      ...
    }()
    ...
    // 获取资源版本号
    resourceVersion = listMetaInterface.GetResourceVersion()
    initTrace.Step("Resource version extracted")
    // 将资源数据转换为资源对象列表
    items, err := meta.ExtractList(list)
    // 将资源信息存储到DeltaFIFO中,全量替换本地缓存
    // 内部调用了replace方法
    if err := r.syncWith(items, resourceVersion); err != nil {
      return fmt.Errorf("unable to sync list result: %v", err)
    }
    // 设置最新的资源版本号
    r.setLastSyncResourceVersion(resourceVersion)
    return nil
  }(); err != nil {
    return err
  }

  go func() {
    ...
    for {
      ...
      // 同步资源
      if r.ShouldResync == nil || r.ShouldResync() {
        // 调用DeltaFIFO的Resync方法
        if err := r.store.Resync(); err != nil {
          ...
        }
      }
      resyncCh, cleanup = r.resyncChan()
    }
  }()

  for {
    ...
    // 监听资源
    w, err := r.listerWatcher.Watch(options)
    // 处理handler事件,用户注册的Add,Delete,Update函数
    if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh);
    ...
  }
}

// syncWith函数
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
  found := make([]interface{}, 0, len(items))
  for _, item := range items {
    found = append(found, item)
  }
  // 调用cache.Replace
  return r.store.Replace(found, resourceVersion)
}

// cache.Replace
func (c *cache) Replace(list []interface{}, resourceVersion string) error {
  items := make(map[string]interface{}, len(list))
  for _, item := range list {
    key, err := c.keyFunc(item)
    if err != nil {
      return KeyError{item, err}
    }
    items[key] = item
  }
  c.cacheStorage.Replace(items, resourceVersion)
  return nil
}

метод watchHandler

Функция watchHandler обрабатывает различные события от часов, все события сохраняются в ResultChan, в том числе: тип события, объект ресурса

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
  ...
  for {
    select {
    ...
    // 获取watch接口中的事件的channel
    case event, ok := <-w.ResultChan():
      ...
      switch event.Type {
      // 处理Add函数
      case watch.Added:
        // store是DeltaFIFO类
        err := r.store.Add(event.Object)
      // 处理Modified函数
      case watch.Modified:
        err := r.store.Update(event.Object)
      // 处理Deleted函数
      case watch.Deleted:
        err := r.store.Delete(event.Object)
      }
      *resourceVersion = newResourceVersion
      // 设置资源版本
      r.setLastSyncResourceVersion(newResourceVersion)
      eventCount++
    }
  }
  ...
}

DeltaFIFO

Следующий анализ исходного кода будет использовать DeltaFIFO, который будет представлен здесь первым.

DeltaFIFO используется для хранения различных событий, возвращаемых Watch API, в очереди будет один и тот же ресурсный объект с разными типами операций. DeltaFIFO реализует интерфейс Queue, а Queue наследует интерфейс Store.

Исходный путь: vendor/k8s.io/client-go/tools/cache/delta_fifo.go

type DeltaFIFO struct {
  // lock/cond protects access to 'items' and 'queue'.
  lock sync.RWMutex
  cond sync.Cond

  // We depend on the property that items in the set are in
  // the queue and vice versa, and that all Deltas in this
  // map have at least one Delta.

  // map结构存储:key是资源对象的key,value是对象的Deltas数组
  items map[string]Deltas

  // 存储资源对象的key
  queue []string

  // populated is true if the first batch of items inserted by Replace() has been populated
  // or Delete/Add/Update was called first.
  populated bool
  // initialPopulationCount is the number of items inserted by the first call of Replace()
  initialPopulationCount int

  // keyFunc is used to make the key used for queued item
  // insertion and retrieval, and should be deterministic.
  keyFunc KeyFunc

  // Index本地存储对象
  knownObjects KeyListerGetter

  closed     bool
  closedLock sync.Mutex
}

Основные функции включают в себя:

  • метод производителя
  • потребительский метод
  • Механизм ресинхронизации

метод производителя

После того, как Reflector отслеживает изменения ресурсов, он добавляет информацию об изменении ресурсов, такую ​​как «Добавить», «Удалить» и «Обновить», в DeltaFIFO. То есть производитель очереди, метод выглядит следующим образом, которые вызываются внутренне

Функция входа — r.store.Add(event.Object), которая была представлена ​​в предыдущем watchHandler.

func (f *DeltaFIFO) Add(obj interface{}) error {
  ...
  return f.queueActionLocked(Added, obj)
}

func (f *DeltaFIFO) Update(obj interface{}) error {
  ...
  return f.queueActionLocked(Updated, obj)
}

func (f *DeltaFIFO) Delete(obj interface{}) error {
  ...
  return f.queueActionLocked(Deleted, obj)
}

// 举例说明其中一个处理函数,其他的类似,内部都调用queueActionLocked
func (f *DeltaFIFO) Update(obj interface{}) error {
  f.lock.Lock()
  defer f.lock.Unlock()
  f.populated = true
  // 内部调用queueActionLocked
  return f.queueActionLocked(Updated, obj)
}

метод queueActionLocked

// 内部主要是封装Delta事件,并加入队列,供消费者消费(HandleDeltas函数)
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
  // 根据资源对象,得到key,一般是 namespace/name 格式
  id, err := f.KeyOf(obj)

  // 将watch到的事件类型和资源对象,封装成Delta对象
  newDeltas := append(f.items[id], Delta{actionType, obj})
  // 去重操作
  newDeltas = dedupDeltas(newDeltas)

  // 将Delta对象加入到队列中
  if len(newDeltas) > 0 {
    if _, exists := f.items[id]; !exists {
      f.queue = append(f.queue, id)
    }
    f.items[id] = newDeltas
    f.cond.Broadcast()
  } else {
    delete(f.items, id)
  }
  return nil
}

Потребительский метод — processLoop

Предыдущий анализ исходного кода, после анализа метода Run рефлектора, следующим шагом является метод processLoop контроллера.

func (c *controller) Run(stopCh <-chan struct{}) {
  // 调用NewReflector初始化一个Reflector
  // 必须传入ListWatcher数据接口对象
  r := NewReflector(
    c.config.ListerWatcher,
    c.config.ObjectType,
    c.config.Queue,
    c.config.FullResyncPeriod,
  )
  ...
  // 调用Reflector的Run方法,启动监控,并处理监控事件
  wg.StartWithChannel(stopCh, r.Run)

  // processLoop负责从DeltaFIFO取出数据并消费
  wait.Until(c.processLoop, time.Second, stopCh)
}
func (c *controller) processLoop() {
  for {
    // 从DeltaFIFO队列中取出数据,并交给process处理
    // process函数保存在config.Process中,也就是前面传入的 HandleDeltas
    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    ...
  }
}

Поп-метод

Метод потребления DeltaFIFO — Pop.Эту функцию необходимо передать в функцию процесса, которая используется для получения и обработки метода обратного вызова объекта.

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  for {
    for len(f.queue) == 0 {
      // 当队列为空时,Pop函数阻塞住,知道新的数据入队列才唤醒
      // 如果Close函数被调用,closed状态被设置,并且广播
      if f.closed {
        return nil, ErrFIFOClosed
      }

      f.cond.Wait()
    }
    // 走到这里,说明队列中有数据,取出数据
    id := f.queue[0]
    ...
    // 将数据交给上层回调函数处理
    err := process(item)
    // 出错则将数据重新放入队列
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item)
      err = e.Err
    }
    return item, err
  }
}

функция процесса: HandleDeltas

  • Функция обратного вызова HandleDeltas передается в методе Run.
  • Поп-объект, выполняемый внутри processLoop, представляет собой HandleDeltas, переданный выше.

Основная логика включает в себя:

  • Обновление локального кэша cacheStorage фактически обновляет структуру данных threadSafeMap.
  • Уведомление о событии всем слушателям фактически помещает данные в addCh слушателя для потребления потребителем.
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  // 获取所有的Delta资源
  for _, d := range obj.(Deltas) {
    // 判断资源类型
    switch d.Type {
    // 如果是下列类型,将资源存储到Indexer
    case Sync, Replaced, Added, Updated:
      s.cacheMutationDetector.AddObject(d.Object)
      if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
        // indexer的实现类是前面介绍过的cache
        if err := s.indexer.Update(d.Object); err != nil {
          return err
        }

        isSync := false
        switch {
        case d.Type == Sync:
          isSync = true
        case d.Type == Replaced:
          if accessor, err := meta.Accessor(d.Object); err == nil {
            if oldAccessor, err := meta.Accessor(old); err == nil {
              isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
            }
          }
        }
        // 将资源对象分发至 SharedInformer 的事件处理函数中
        s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
      } else {
        if err := s.indexer.Add(d.Object); err != nil {
          return err
        }
        s.processor.distribute(addNotification{newObj: d.Object}, false)
      }
    case Deleted:
      if err := s.indexer.Delete(d.Object); err != nil {
        return err
      }
      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    }
  }
  return nil
}

// 更新本地缓存cacheStorage
// 其实就是更新 threadSafeMap 这个数据结构,threadSafeMap的初始化在前面介绍过
func (c *cache) Update(obj interface{}) error {
  key, err := c.keyFunc(obj)
  if err != nil {
    return KeyError{obj, err}
  }
  c.cacheStorage.Update(key, obj)
  return nil
}

// distribute函数
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
  ...
  if sync {
    for _, listener := range p.syncingListeners {
      // 核心方法
      listener.add(obj)
    }
  } else {
    for _, listener := range p.listeners {
      listener.add(obj)
    }
  }
}

// listener.add方法,这里将事件添加到listener的addCh通道中
// 至此,也回答的前面的问题-- 谁往p.addCh中生产数据
func (p *processorListener) add(notification interface{}) {
  // 不同更新类型的对象加入到channel中
  // 供给processorListener的Run方法使用
  p.addCh <- notification
}

Механизм ресинхронизации

Три шага в методе ListAndWatch:

  • List
  • Rsync
  • Watch

RSync отвечает за синхронизацию объектов ресурсов, хранимых локально индексатором, с DeltaFIFO и за установку типа ресурса на тип синхронизации. Запланированное выполнение в Reflector

func (f *DeltaFIFO) Resync() error {
  // 获取indexer本地存储对象
  keys := f.knownObjects.ListKeys()
  for _, k := range keys {
    if err := f.syncKeyLocked(k); err != nil {
      return err
    }
  }
  return nil
}

Indexer

Как упоминалось в предыдущем анализе, изменения ресурсов будут сохраняться в локальном индексаторе, который будет представлен здесь.

  • Индексатор — это локальное хранилище, используемое client-go для хранения объектов ресурсов и имеющее собственную функцию индексирования.
  • Объекты ресурсов, потребляемые Reflector из DeltaFIFO, хранятся в индексаторе.
  • Данные индексатора согласуются с Etcd, client-go может легко считывать данные с локального сервера, что снижает нагрузку на API-сервер.

Четыре важные структуры данных

Расположение исходного кода: k8s.io/client-go/tools/cache/index.go

// 存储缓存数据
// type Empty struct{}
// type String map[string]Empty
// 这里的sets.String是用map模拟set,map中的value都是空结构体
type Index map[string]sets.String

// 索引器函数,接收资源对象,返回检索结果列表
type IndexFunc func(obj interface{}) ([]string, error)

// 存储索引器,key为索引器名称,value为索引器实现函数
type Indexers map[string]IndexFunc

// 存储缓存器,key为缓存器名称,value为缓存数据
type Indices map[string]Index

ThreadSafeMap

Индексатор инкапсулирован на основе ThreadSafeMap, давайте сначала взглянем на ThreadSafeMap.

  • ThreadSafeMap — это хранилище в памяти, данные не хранятся на диске.
  • Добавление, удаление, изменение и проверка будут заблокированы для обеспечения согласованности данных.
  • Индексатор и буфер используются внутри

Расположение исходного кода: k8s.io/tools/cache/thread_safe_store.go

type threadSafeMap struct {
  lock  sync.RWMutex
  // map结构存储资源数据
  // map中的key是通过keyFunc函数计算得出,默认使用MetaNamespaceFunc函数
  // 该函数根据资源对象计算出<namespace>/<name>格式的key
  // value是Delta对象,包括Type和Object资源对象
  items map[string]interface{}
  // 存储索引器,key为索引器名称,value为索引器实现函数
  indexers Indexers
  // 存储缓存器,key为缓存器名称,value为缓存的资源对象数据
  indices Indices
}

// 通过执行索引器函数得到索引结果
// 需要两个参数:索引器名称、需要检索的key
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
  // 查找指定的索引器函数
  indexFunc := c.indexers[indexName]
  if indexFunc == nil {
    return nil, fmt.Errorf("Index with name %s does not exist", indexName)
  }
  // 查找指定的缓存器函数
  index := c.indices[indexName]

  // 从缓存数据中查找并返回数据
  set := index[indexedValue]
  list := make([]interface{}, 0, set.Len())
  for key := range set {
    list = append(list, c.items[key])
  }

  return list, nil
}

Суммировать

Механизм Informer играет важную роль в k8s, и его исходный код также очень сложен. В процессе обучения необходимо сопоставить картинку в начале статьи, иначе будет легко обойти. Он использует Queue и Channel для разделения различных компонентов. Личный опыт таков: вокруг основной идеи логика может быть понятнее при анализе, то есть: кто помещает данные в канал, кто получает данные из канала.