Иллюстрация базовой реализации механизма синхронизации состояния контейнера kubernetes

Go

После того, как Pod назначен на определенный узел Node в K8s, последующая информация об обслуживании состояния поддерживается kubelet на соответствующей машине.Как получить обратную связь о локальном рабочем состоянии в режиме реального времени и уведомить apiserver, является сложностью проектирования.Этот раздел в основном о Понимание внутреннего устройства путем фактического анализа его основной структуры данных с помощью двух процессов обнаружения изменений состояния пода и обнаружения изменений состояния.

1. Государственное управление

1.1 Статическая капсула

image.pngСтатические поды в основном относятся к тем подам, которые не создаются путем обнаружения apiserver, потому что apiserver их не содержит, но также должен поддерживать и получать статус таких подов.k8s разработал концепцию зеркалирования подов, которая фактически предназначена для статических подов отражает под. Основная информация пода согласуется со статическим подом и создается в apiserver. Зеркальный под, который может быть воспринят apiserver, отражает состояние реального статического пода.

1.2 Источник данных о состоянии

image.pngstatusManager — это ключевой компонент для синхронизации состояний. Он должен интегрировать данные в текущей операции Pod и данные, хранящиеся в API-сервере, чтобы определить окончательный переход состояния. Здесь давайте сначала сосредоточимся на том, что нарисовано на диаграмме. Другие состояния будут введены один за другим позже.

2. Согласованность версий

type versionedPodStatus struct {
    status v1.PodStatus
    // 单调递增的版本号(每个pod)
    version uint64
    // Pod name & namespace, for sending updates to API server.
    podName      string
    podNamespace string
}

Для обеспечения синхронизации с информацией об аписервере в Kubelet информация о версии состояния пода сохраняется локально.Помимо сохранения текущих данных о состоянии пода, существует также номер версии, который определяется путем сравнения монотонно возрастающих номеров версий. синхронизация состояния

3. Реализация основного исходного кода

Процесс statusManager на самом деле довольно сложный, сегодня мы поговорим только об одном сценарии, то есть kubelet воспринимает обновление пода через apiserver, а затем разбирает поток данных в statusMangaer по потоку данных этой функции.

3.1 Основные структуры данных

Структуры данных, связанные с состоянием ядра в диспетчере, можно в основном разделить на две категории: обслуживание данных сопоставления (podManager, podStatuses, apiStatusVersions), конвейер передачи данных (podStatusChannel), а остальное — это кублет, который взаимодействует с apiserver и модулем. модуль проверки удаленияDeletionSafety

type manager struct {
    kubeClient clientset.Interface
        // 管理缓存Pod,包含镜像pod和静态pod的映射
    podManager kubepod.Manager
    // 从pod UID映射到相应pod的版本状态信息 。
    podStatuses      map[types.UID]versionedPodStatus
    podStatusesLock  sync.RWMutex
    podStatusChannel chan podStatusSyncRequest
    // 存储镜像pod的版本
    apiStatusVersions map[kubetypes.MirrorPodUID]uint64
    podDeletionSafety PodDeletionSafetyProvider
}

3.2 Установить статус модуля

Настройка состояния пода в основном находится в syncPod в kubelet.После получения изменения события пода он синхронизирует последние данные пода с apiserver, чтобы получить последнее состояние текущего пода на стороне apiserver.

func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) {
    m.podStatusesLock.Lock()
    defer m.podStatusesLock.Unlock()

    for _, c := range pod.Status.Conditions {
        if !kubetypes.PodConditionByKubelet(c.Type) {
            klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+
                "But it is not owned by kubelet.", string(c.Type), format.Pod(pod))
        }
    }
    // Make sure we're caching a deep copy.
    status = *status.DeepCopy()

    // 如果Pod被删除了则需要强制与apiserver进行信息的同步
    m.updateStatusInternal(pod, status, pod.DeletionTimestamp != nil)
}

3.3 Обновите состояние внутреннего кэша, чтобы генерировать события синхронизации

image.png

3.3.1 Получить статус кеша

    var oldStatus v1.PodStatus
    // 检测之前的本地缓存数据
    cachedStatus, isCached := m.podStatuses[pod.UID]
    if isCached {
        oldStatus = cachedStatus.status
    } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok {
        oldStatus = mirrorPod.Status
    } else {
        oldStatus = pod.Status
    }

3.3.2 Определение статуса контейнера

Определение состояния контейнера в основном предназначено для определения легитимности пересылки статуса завершения контейнера, а на самом деле — для определения возможности перезапуска завершенного контейнера в соответствии с установленной RestartPolicy пода.

    if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err != nil {
        klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
        return false
    }
    if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err != nil {
        klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err)
        return false
    }

3.3.3 Обновление времени последнего перехода PodCondition

Установите время обновления LastTransitionTime, соответствующее PodCondition, через условие в последнем статусе на текущее время.

    // Set ContainersReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)

    // Set ReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodReady)

    // Set InitializedCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)

    // Set PodScheduledCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)

3.3.4 Усечение времени корректуры слишком велико

Сначала будет определено максимальное количество байтов каждого контейнера в соответствии с текущим количеством контейнеров, а затем информация сообщения в состоянии завершения в контейнере будет усечена, и одновременно будет проверено время.

    normalizeStatus(pod, &status)

3.3.5 Обнаружение состояния обновления статуса

Если соответствующие данные были кэшированы ранее, а кэшированные данные и текущее состояние не изменились, и нет необходимости форсировать обновление, возвращайтесь напрямую

    if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && !forceUpdate {
        // 如果不强制更新 ,默认是true此处不会成立
        klog.V(3).Infof("Ignoring same status for pod %q, status: %+v", format.Pod(pod), status)
        return false // No new status.
    }

3.3.6 Создание кэша обновлений синхронных событий

Создание последних данных кэша состояния и увеличение информации о локальной версии.

    // 构建新的状态
    newStatus := versionedPodStatus{
        status:       status,
        version:      cachedStatus.version + 1, // 更新器缓存
        podName:      pod.Name,
        podNamespace: pod.Namespace,
    }
    // 更新新的缓存状态
    m.podStatuses[pod.UID] = newStatus

    select {
    case m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}: // 构建一个新的同步请求
        klog.V(5).Infof("Status Manager: adding pod: %q, with status: (%d, %v) to podStatusChannel",
            pod.UID, newStatus.version, newStatus.status)
        
        return true
    default:
        // Let the periodic syncBatch handle the update if the channel is full.
        // We can't block, since we hold the mutex lock.
        klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v",
            format.Pod(pod), status)
        return false
    }

3.4 Обновление статуса обнаружения

image.pngСтатус обнаружения на самом деле является рабочим статусом контейнера в поде.Например, если установлено обнаружение готовности, при сбое обнаружения контейнера соответствующая служба будет уведомлена об удалении пода из конечной точки бэкэнда. Давайте разберемся, что такое Kubelet, как уведомить аписервер о запущенном статусе.

3.4.1 Получить текущий статус

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
    m.podStatusesLock.Lock()
    defer m.podStatusesLock.Unlock()

    // 获取本地的容器
    pod, ok := m.podManager.GetPodByUID(podUID)
    if !ok {
        klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
        return
    }

    // 获取当前的状态
    oldStatus, found := m.podStatuses[pod.UID]
    if !found {
        klog.Warningf("Container readiness changed before pod has synced: %q - %q",
            format.Pod(pod), containerID.String())
        return
    }

    // 获取当前的容器状态
    containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
    if !ok {
        klog.Warningf("Container readiness changed for unknown container: %q - %q",
            format.Pod(pod), containerID.String())
        return
    }

3.4.2 Определение того, изменилось ли состояние

    // 检测前后的就绪状态是否发生改变
    if containerStatus.Ready == ready {
        klog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
            format.Pod(pod), containerID.String())
        return
    }

3.4.3 Изменение состояния готовности контейнера

Получить состояние контейнера, изменить состояние готовности на текущее состояние

    status := *oldStatus.status.DeepCopy()
    containerStatus, _, _ = findContainerStatus(&status, containerID.String())
    containerStatus.Ready = ready

3.4.4 Изменить в соответствии с последним статусом контейнера

Он изменит состояние в соответствующем PodCondition в соответствии с состоянием обнаружения контейнера в текущей среде выполнения и, наконец, вызовет внутреннюю логику обновления.

    updateConditionFunc := func(conditionType v1.PodConditionType, condition v1.PodCondition) {
        conditionIndex := -1
        // 获取Pod对应的PodCondition状态
        for i, condition := range status.Conditions {
            if condition.Type == conditionType {
                conditionIndex = i
                break
            }
        }
        // 修改或追加Pod对应的PodCondition状态
        if conditionIndex != -1 {
            status.Conditions[conditionIndex] = condition
        } else {
            klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status)
            status.Conditions = append(status.Conditions, condition)
        }
    }
    // 计算Ready状态
    updateConditionFunc(v1.PodReady, GeneratePodReadyCondition(&pod.Spec, status.Conditions, status.ContainerStatuses, status.Phase))
    // 计算容器Ready状态
    updateConditionFunc(v1.ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase))
    m.updateStatusInternal(pod, status, false)

3.5 Запустите задачу фоновой синхронизации

statusManager запустит фоновый поток для обработки синхронных запросов в конвейере обновлений.

func (m *manager) Start() {
    // 省略非核心代码
    go wait.Forever(func() {
        select {
        case syncRequest := <-m.podStatusChannel:
            // 获取最新的状态信息,更新apiserver
            klog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
            m.syncPod(syncRequest.podUID, syncRequest.status)
        case <-syncTicker:
            m.syncBatch()
        }
    }, 0)
}

3.6 Синхронизация статуса пода

image.png

3.6.1 Обнаружение состояния синхронизации

Обнаружение условий синхронизации в основном предназначено для определения того, была ли изменена версия зеркального модуля и удален ли модуль в настоящее время.Если модуль не был удален, он вернет false, то есть для модуля, который не был удален. удален, нам все еще нужно продолжать обновлять его статус.

    if !m.needsUpdate(uid, status) {
        klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
        return
    }

3.6.2 Получите последние данные Pod через apiserver

Если информация о поде не получена, просто выйдите напрямую

    pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})
    if errors.IsNotFound(err) {
        klog.V(3).Infof("Pod %q does not exist on the server", format.PodDesc(status.podName, status.podNamespace, uid))
        // 如果Pod已经被删除了,就直接退出就行
        return
    }
    if err != nil {
        klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
        return
    }

3.6.3 Интерфейс Call Patch для обновления

Это объединит наименьшее состояние с предыдущим состоянием, а затем вызовет kubeClient для изменения состояния apiserver.

    oldStatus := pod.Status.DeepCopy()
    // 更新服务端的状态
    newPod, patchBytes, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status))
    klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes)
    if err != nil {
        klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
        return
    }

3.6.4 Обновление информации о версии локального Apiserver

    // 当前是最新的状态
    pod = newPod

    klog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
    m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.version

3.6.5 Обнаружение удаления Pod

В основном это завершающий этап, то есть освобождаются ресурсы, соответствующие поду, а затем под на стороне аписервера окончательно удаляется.

// 如果pod的DeletionTimestamp被设置,则对应的Pod需要被删除
if m.canBeDeleted(pod, status.status) {
        deleteOptions := metav1.NewDeleteOptions(0)
        
        deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
        //  调用apiserver对Pod进行删除
        err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
        if err != nil {
            klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
            return
        }
        klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
        m.deletePodStatus(uid)
    }

Общий дизайн Tanhuo, наверное, такой. Надеюсь, что большие ребята будут уделять больше внимания и общаться вместе. Адрес электронной книги для чтения исходного кода k8s:woohoo.yuque.com/восемь часов/он…

Идентификатор WeChat: baxiaoshi2020

Обратите внимание на номер бюллетеня, чтобы прочитать больше статей об анализе исходного кода.21天大棚

Другие статьи, чтобы следоватьwww.sreguide.com

Эта статья опубликована в блогеOpenWriteвыпускать