Иллюстрация базовой реализации механизма обнаружения контейнеров kubernetes

Go

После подтягивания контейнера через kubelet в k8s пользователь может указать метод обнаружения для реализации проверки работоспособности контейнера.В настоящее время поддерживается три метода: TCP, Http и команда.Сегодня мы представим реализацию всего его обнаружения модуль и узнать о его конкретной реализации периодического обнаружения, счетчика, задержки и других конструкций

1. Общий дизайн Разведки

1.1 Потоковая модель

image.pngПотоковая модель зондирования относительно проста: базовые задачи зондирования выполняются с помощью рабочих процессов, управление рабочими процессами осуществляется с помощью Manager, а результаты зондирования кэшируются.

1.2 Периодическое зондирование

image.pngЧтобы сгенерировать таймер в соответствии с периодом каждой задачи обнаружения, вам нужно только прослушать событие таймера.

1.3 Реализация механизма обнаружения

image.pngРеализация механизма зондирования относительно проста, за исключением команд Http и Tcp.Tcp нужно только напрямую связать через net.DialTimeout, в то время как Http создает запрос Http, создавая http.Transport для выполнения операции Do.

Относительно сложным является exec, который сначала генерирует команду в соответствии с переменными среды текущего контейнера, а затем создает команду через контейнер, команду, тайм-аут и т. д. и, наконец, вызывает runtimeService для вызова csi для выполнения команды. .

2. Реализация интерфейса зонда

2.1 Структура основного члена

type prober struct {
    exec execprobe.Prober
    // 我们可以看到针对readiness/liveness会分别启动一个http Transport来进行链接
    readinessHTTP httpprobe.Prober
    livenessHTTP  httpprobe.Prober
    startupHTTP   httpprobe.Prober
    tcp           tcpprobe.Prober
    runner        kubecontainer.ContainerCommandRunner

    // refManager主要是用于获取成员的引用对象
    refManager *kubecontainer.RefManager
    // recorder会负责探测结果事件的构建,并最终传递回 apiserver
    recorder   record.EventRecorder
}

2.2 Обнаружение основного процесса

Основной процесс зондирования в основном расположен в зондовом методе зонда, и его основной процесс разделен на три части.

2.2.1 Получить целевую конфигурацию зонда

func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
var probeSpec *v1.Probe
// 根据探活的类型来获取对应位置的探活配置
    switch probeType {
    case readiness:
        probeSpec = container.ReadinessProbe
    case liveness:
        probeSpec = container.LivenessProbe
    case startup:
        probeSpec = container.StartupProbe
    default:
        return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
    }

2.2.2 Выполнить зонд для записи сообщения об ошибке

Если возвращенная ошибка не является успехом или состоянием предупреждения, будет получен соответствующий эталонный объект, а затем событие будет построено через регистратор, а результат будет отправлен обратно на аписервер.

// 执行探活流程    
result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
    
    if err != nil || (result != probe.Success && result != probe.Warning) {
        // // 如果返回的错误,或者不是成功或者警告的状态
        // 则会获取对应的引用对象,然后通过 
        ref, hasRef := pb.refManager.GetRef(containerID)
        if !hasRef {
            klog.Warningf("No ref for container %q (%s)", containerID.String(), ctrName)
        }
        if err != nil {
            klog.V(1).Infof("%s probe for %q errored: %v", probeType, ctrName, err)
            recorder进行事件的构造,发送结果返回apiserver
            if hasRef {
                pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe errored: %v", probeType, err)
            }
        } else { // result != probe.Success
            klog.V(1).Infof("%s probe for %q failed (%v): %s", probeType, ctrName, result, output)
            // recorder进行事件的构造,发送结果返回apiserver
            if hasRef {
                pb.recorder.Eventf(ref, v1.EventTypeWarning, events.ContainerUnhealthy, "%s probe failed: %s", probeType, output)
            }
        }
        return results.Failure, err
    }

2.2.3 Реализация повторной попытки зондирования

func (pb *prober) runProbeWithRetries(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID, retries int) (probe.Result, string, error) {
    var err error
    var result probe.Result
    var output string
    for i := 0; i < retries; i++ {
        result, output, err = pb.runProbe(probeType, p, pod, status, container, containerID)
        if err == nil {
            return result, output, nil
        }
    }
    return result, output, err
}

2.2.4 Выполнение зондирования в соответствии с типом зонда

func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
    timeout := time.Duration(p.TimeoutSeconds) * time.Second
    if p.Exec != nil {
        klog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
        command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
        return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
    }
    if p.HTTPGet != nil {
        // 获取协议类型与 http参数信息
        scheme := strings.ToLower(string(p.HTTPGet.Scheme))
        host := p.HTTPGet.Host
        if host == "" {
            host = status.PodIP
        }
        port, err := extractPort(p.HTTPGet.Port, container)
        if err != nil {
            return probe.Unknown, "", err
        }
        path := p.HTTPGet.Path
        klog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
        url := formatURL(scheme, host, port, path)
        headers := buildHeader(p.HTTPGet.HTTPHeaders)
        klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
        switch probeType {
        case liveness:
            return pb.livenessHTTP.Probe(url, headers, timeout)
        case startup:
            return pb.startupHTTP.Probe(url, headers, timeout)
        default:
            return pb.readinessHTTP.Probe(url, headers, timeout)
        }
    }
    if p.TCPSocket != nil {
        port, err := extractPort(p.TCPSocket.Port, container)
        if err != nil {
            return probe.Unknown, "", err
        }
        host := p.TCPSocket.Host
        if host == "" {
            host = status.PodIP
        }
        klog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
        return pb.tcp.Probe(host, port, timeout)
    }
    klog.Warningf("Failed to find probe builder for container: %v", container)
    return probe.Unknown, "", fmt.Errorf("missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

3. рабочий поток

Рабочий рабочий поток выполняет обнаружение, необходимо рассмотреть несколько вопросов: 1. Когда контейнер только что запущен, ему может потребоваться некоторое время ожидания. Например, приложение может выполнить некоторую работу по инициализации и не готово. 2. Если обнаружено, что обнаружение контейнера дает сбой и перезапускается, повторное обнаружение перед запуском не имеет смысла 3. Будь то успех или неудача, могут потребоваться некоторые пороговые значения, чтобы помочь избежать одиночного сбоя с небольшой вероятностью, перезапустить контейнер

3.1 Основные члены

Помимо конфигурации обнаружения, ключевыми параметрами являются в основном параметр onHold, который используется для принятия решения о том, следует ли откладывать обнаружение контейнера, то есть при перезапуске контейнера обнаружение нужно отложить, а resultRun — это счетчик , будь то непрерывный успех или непрерывный отказ. , накапливаются через счетчик, и тогда будет судить, превышен ли заданный порог

type worker struct {
    // 停止channel
    stopCh chan struct{}

    // 包含探针的pod
    pod *v1.Pod

    // 容器探针
    container v1.Container

    // 探针配置
    spec *v1.Probe

    // 探针类型
    probeType probeType

    // The probe value during the initial delay.
    initialValue results.Result

    // 存储探测结果
    resultsManager results.Manager
    probeManager   *manager

    // 此工作进程的最后一个已知容器ID。
    containerID kubecontainer.ContainerID
    // 最后一次探测结果
    lastResult results.Result
    // 探测连续返回相同结果的此时
    resultRun int

    // 探测失败会设置为true不会进行探测 
    onHold bool

    // proberResultsMetricLabels holds the labels attached to this worker
    // for the ProberResults metric by result.
    proberResultsSuccessfulMetricLabels metrics.Labels
    proberResultsFailedMetricLabels     metrics.Labels
    proberResultsUnknownMetricLabels    metrics.Labels
}

3.2 Основной процесс реализации обнаружения

image.png

3.2.1 Прерывание при обнаружении неудачного контейнера

Если состояние текущего контейнера было завершено, нет необходимости его обнаруживать, просто верните

    // 获取当前worker对应pod的状态
    status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
    if !ok {
        // Either the pod has not been created yet, or it was already deleted.
        klog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
        return true
    }
    // 如果pod终止worker应该终止
    if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
        klog.V(3).Infof("Pod %v %v, exiting probe worker",
            format.Pod(w.pod), status.Phase)
        return false
    }

3.2.2 Восстановление датчика задержки

Отложенное обнаружение и восстановление в основном относится к операции перезапуска в случае сбоя обнаружения, и в течение этого периода никакое обнаружение выполняться не будет.Логика восстановления заключается в том, чтобы определить, изменился ли идентификатор соответствующего контейнера, и изменить onHold.

// 通过容器名字获取最新的容器信息    
c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
    if !ok || len(c.ContainerID) == 0 {
        // Either the container has not been created yet, or it was deleted.
        klog.V(3).Infof("Probe target container not found: %v - %v",
            format.Pod(w.pod), w.container.Name)
        return true // Wait for more information.
    }

    if w.containerID.String() != c.ContainerID {
        // 如果容器改变,则表明重新启动了一个容器
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }
        w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
        w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
        // 获取到一个新的容器,则就需要重新开启探测 
        w.onHold = false
    }

    if w.onHold {
        //如果当前设置延缓状态为true,则不进行探测
        return true
    }

3.2.3 Датчик задержки инициализации

Обнаружение задержки инициализации в основном означает, что время работы контейнера Running меньше настроенного InitialDelaySeconds и возвращается напрямую.

    
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
        return true
    }

3.2.4 Выполнение логики обнаружения

    result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
    if err != nil {
        // Prober error, throw away the result.
        return true
    }

    switch result {
    case results.Success:
        ProberResults.With(w.proberResultsSuccessfulMetricLabels).Inc()
    case results.Failure:
        ProberResults.With(w.proberResultsFailedMetricLabels).Inc()
    default:
        ProberResults.With(w.proberResultsUnknownMetricLabels).Inc()
    }

3.2.5 Накопить количество обнаружений

После накопления счетчика обнаружения он будет судить, превышает ли накопленный счетчик установленный порог, если нет, изменение состояния не будет выполнено.

    if w.lastResult == result {
        w.resultRun++
    } else {
        w.lastResult = result
        w.resultRun = 1
    }

    if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
        (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
        // Success or failure is below threshold - leave the probe state unchanged.
        // 成功或失败低于阈值-保持探测器状态不变。
        return true
    }

3.2.6 Изменить статус обнаружения

Если статус обнаружения изменен, вам нужно сначала сохранить статус, а если обнаружение не удалось, вам нужно изменить статус onHold на true, чтобы отложить обнаружение, и установить счетчик на 0.

// 这里会修改对应的状态信息    
w.resultsManager.Set(w.containerID, result, w.pod)

    if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
        // 容器运行liveness/starup检测失败,他们需要重启, 停止探测,直到有新的containerID
        // 这是为了减少命中#21751的机会,其中在容器停止时运行 docker exec可能会导致容器状态损坏
        w.onHold = true
        w.resultRun = 0
    }

3.3 Обнаружение процесса основного цикла

Основной процесс очень прост для выполнения вышеуказанного процесса обнаружения.

func (w *worker) run() {
    // 根据探活周期来构建定时器
    probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second

    // If kubelet restarted the probes could be started in rapid succession.
    // Let the worker wait for a random portion of tickerPeriod before probing.
    time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))

    probeTicker := time.NewTicker(probeTickerPeriod)

    defer func() {
        // Clean up.
        probeTicker.Stop()
        if !w.containerID.IsEmpty() {
            w.resultsManager.Remove(w.containerID)
        }

        w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
        ProberResults.Delete(w.proberResultsSuccessfulMetricLabels)
        ProberResults.Delete(w.proberResultsFailedMetricLabels)
        ProberResults.Delete(w.proberResultsUnknownMetricLabels)
    }()

probeLoop:
    for w.doProbe() {
        // Wait for next probe tick.
        select {
        case <-w.stopCh:
            break probeLoop
        case <-probeTicker.C:
            // continue
        }
    }
}

Я буду здесь сегодня, а завтра я расскажу о реализации probeManager.Если вы поделитесь и перешлете его, даже если вы поддержите меня, он готов к работе.

Идентификатор WeChat: baxiaoshi2020

Обратите внимание на номер бюллетеня, чтобы прочитать больше статей об анализе исходного кода.21天大棚

Другие статьи, чтобы следоватьwww.sreguide.com

Эта статья опубликована в блогеOpenWriteвыпускать