Весь код в этом разделе основан на последней версии 1.13.4.
Начать анализ
Kubernetes соответствует всем компонентам загрузочного кода, apiserver начинает использоватьcobraКомандная строка
1. Завершите настройку параметров;
2. Определить, является ли конфигурация законной;
3. Выполнить финал
Runметод.RunМетод относительно прост1, создать серверную часть;
2. Запустите сервер.
Поскольку apiserver по сути является серверным сервером, ядром всего кода является настройка сервера, включая маршрутизацию, права доступа и взаимодействие с базой данных (и т. д.). Давайте посмотрим, как создается серверная часть.
Создание на стороне сервера
Создание серверной части сосредоточено вCreateServerChainметод. Код метода следующий:
// CreateServerChain creates the apiservers connected via delegation.
// CreateServerChain创建通过委托连接的apiservers,创建一系列的server
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil {
return nil, err
}
// 1.创建kubeAPIServerConfig配置
kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, admissionPostStartHook, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
// 2.判断是否配置了扩展API server,创建apiExtensionsConfig配置
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))
if err != nil {
return nil, err
}
// apiExtensionsServer,可扩展的API server
// 3.启动扩展的API server
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
}
// 4.启动最核心的kubeAPIServer
kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer, admissionPostStartHook)
if err != nil {
return nil, err
}
// otherwise go down the normal path of standing the aggregator up in front of the API server
// this wires up openapi
kubeAPIServer.GenericAPIServer.PrepareRun()
// This will wire up openapi for extension api server
apiExtensionsServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
// 5.聚合层的配置aggregatorConfig
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
// 6.aggregatorServer,聚合服务器,对所有的服务器访问的整合
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
// 7.启动非安全端口的server
if insecureServingInfo != nil {
insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
return nil, err
}
}
// 8.返回GenericAPIServer,后续启动安全端口的server
return aggregatorServer.GenericAPIServer, nil
}
Процесс создания в основном включает следующие этапы:
1, в соответствии с конфигурацией apiserver структуры расположения, вызывая методCreateKubeAPIServerConfig;
2. Постройте конфигурацию расширенного API-сервера в соответствии с конфигурацией, и метод вызоваcreateAPIExtensionsConfig;
3. Создайте сервер, включая расширенный API-сервер и собственный API-сервер. Метод вызова:createAPIExtensionsServerиCreateKubeAPIServer. Главное прописать в Контейнер метод маршрутизации каждого обработчика, и следовать ему полностьюgo-restfulПаттерн проектирования заключается в том, что метод обработки прописан в Route, Route по тому же корневому пути прописан в WebService, WebService прописан в Container, а Container отвечает за раздачу. Процесс доступа — Container-->WebService-->Route. более подробныйgo-restfulИспользовать его можно обратиться к своему коду;
4. Настройка и создание сервера агрегации. Главное — интегрировать доступ родного аписервера и расширенного аписервера, и добавить какие-то последующие интерфейсы обработки. метод вызова какcreateAggregatorConfigиcreateAggregatorServer;
5. После завершения создания возвращается настроенная информация о сервере.
Вышеупомянутые шаги, самые основные, это то, как создается ApiServer, то есть как следоватьgo-restfulрежим, добавьте маршруты и соответствующие методы обработки вCreateKubeAPIServerметод в качестве примера,createAPIExtensionsServerпохожий.
Создайте
CreateKubeAPIServerМетоды, как показано ниже
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget, admissionPostStartHook genericapiserver.PostStartHookFunc) (*master.Master, error) {
kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
if err != nil {
return nil, err
}
kubeAPIServer.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-admission-initializer", admissionPostStartHook)
return kubeAPIServer, nil
}
пройти черезCompleteметод завершает окончательную легализацию конфигурации,NewМетод генерировать конфигурацию Kubeapiserver, введитеNewметод,
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
// 通过给定的配置,返回一个新的Master实例。对于部分未配置的选项,可以使用默认配置;但是对于KubeletClientConfig这样的配置,必须手动指定
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
// 1.初始化,创建go-restful的Container,初始化apiServerHandler
s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
if err != nil {
return nil, err
}
if c.ExtraConfig.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
m := &Master{
GenericAPIServer: s,
}
// install legacy rest storage
// /api开头的版本api注册到Container中去,如Pod、Namespace等资源
if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
EventTTL: c.ExtraConfig.EventTTL,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
}
m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider)
}
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
// /apis开头版本的api注册到Container中
restStorageProviders := []RESTStorageProvider{
auditregistrationrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
coordinationrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{},
networkingrest.RESTStorageProvider{},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
settingsrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.RESTStorageProvider{},
admissionregistrationrest.RESTStorageProvider{},
eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
}
m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...)
if c.ExtraConfig.Tunneler != nil {
m.installTunneler(c.ExtraConfig.Tunneler, corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes())
}
m.GenericAPIServer.AddPostStartHookOrDie("ca-registration", c.ExtraConfig.ClientCARegistrationHook.PostStartHook)
return m, nil
}
Содержит следующие шаги:
1. Согласноgo-restfulрежим, вызовc.GenericConfig.NewМетод инициализирует Контейнер, т.е.gorestfulContainerПервоначальный методNewAPIServerHandler.初始化之后,添加路由。
func installAPI(s *GenericAPIServer, c *Config) {
// 添加"/"与"/index.html"路由
if c.EnableIndex {
routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux)
}
// 添加"/swagger-ui/"路由
if c.SwaggerConfig != nil && c.EnableSwaggerUI {
routes.SwaggerUI{}.Install(s.Handler.NonGoRestfulMux)
}
// 添加"/debug"相关路由
if c.EnableProfiling {
routes.Profiling{}.Install(s.Handler.NonGoRestfulMux)
if c.EnableContentionProfiling {
goruntime.SetBlockProfileRate(1)
}
// so far, only logging related endpoints are considered valid to add for these debug flags.
routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
}
// 添加"/metrics"路由
if c.EnableMetrics {
if c.EnableProfiling {
routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux)
} else {
routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux)
}
}
// 添加"/version"路由
routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer)
if c.EnableDiscovery {
s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService())
}
}
К этому методу добавлено несколько маршрутов, включая /, /swagger-ui, /debug/*, /metrics, /version. Вы можете просмотреть соответствующую информацию, обратившись к apiserver.
3. Добавитьapiмаршрут в начале
4, добавитьapisмаршрут в начале
Добавление маршрута (начиная с api)
Маршруты начинающиеся с api проходят черезInstallLegacyAPIдобавлен метод. ВходитьInstallLegacyAPIМетоды, как показано ниже:
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) {
legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)
if err != nil {
klog.Fatalf("Error building core storage: %v", err)
}
controllerName := "bootstrap-controller"
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient)
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook)
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook)
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
klog.Fatalf("Error in registering group versions: %v", err)
}
}
пройти черезNewLegacyRESTStorageметод для создания каждого ресурсаRESTStorage. RESTStorage — это структура, специально определенная вvendor/k8s.io/apiserver/pkg/registry/generic/registry/store.goНиже структура в основном содержитNewFuncВозвращает конкретную информацию о ресурсе,NewListFuncвозвращает список определенных ресурсов,CreateStrategyполитика, когда созданы конкретные ресурсы,UpdateStrategyСтратегия обновления иDeleteStrategyВажные методы, такие как политики при удалении.
существуетNewLegacyRESTStorageВнутри вы можете увидеть RESTStorage, который создал несколько ресурсов.
NewRESTметод для создания соответствующего ресурса. После создания хранилища всех ресурсов используйтеrestStorageMapТип карты каждого ресурса соответствует соответствующему хранилищу, что удобно для последующего единого планирования маршрутизации.Код выглядит следующим образом:
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,
"pods/attach": podStorage.Attach,
"pods/status": podStorage.Status,
"pods/log": podStorage.Log,
"pods/exec": podStorage.Exec,
"pods/portforward": podStorage.PortForward,
"pods/proxy": podStorage.Proxy,
"pods/binding": podStorage.Binding,
"bindings": podStorage.Binding,
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage.Controller,
"replicationControllers/status": controllerStorage.Status,
"services": serviceRest,
"services/proxy": serviceRestProxy,
"services/status": serviceStatusStorage,
"endpoints": endpointsStorage,
"nodes": nodeStorage.Node,
"nodes/status": nodeStorage.Status,
"nodes/proxy": nodeStorage.Proxy,
"events": eventStorage,
"limitRanges": limitRangeStorage,
"resourceQuotas": resourceQuotaStorage,
"resourceQuotas/status": resourceQuotaStatusStorage,
"namespaces": namespaceStorage,
"namespaces/status": namespaceStatusStorage,
"namespaces/finalize": namespaceFinalizeStorage,
"secrets": secretStorage,
"serviceAccounts": serviceAccountStorage,
"persistentVolumes": persistentVolumeStorage,
"persistentVolumes/status": persistentVolumeStatusStorage,
"persistentVolumeClaims": persistentVolumeClaimStorage,
"persistentVolumeClaims/status": persistentVolumeClaimStatusStorage,
"configMaps": configMapStorage,
"componentStatuses": componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate),
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
restStorageMap["replicationControllers/scale"] = controllerStorage.Scale
}
if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "policy", Version: "v1beta1"}) {
restStorageMap["pods/eviction"] = podStorage.Eviction
}
if serviceAccountStorage.Token != nil {
restStorageMap["serviceaccounts/token"] = serviceAccountStorage.Token
}
apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMap
Завершите операции RESTStorage для всех ресурсов, начинающихся с API.
После завершения создания начинается установка маршрута, и выполнениеInstallLegacyAPIGroupметод, основная цепочка вызововInstallLegacyAPIGroup-->installAPIResources-->InstallREST-->Install-->registerResourceHandlers, окончательная маршрутизация ядра строится вregisterResourceHandlersвнутри метода. Это очень сложный метод, и код всего метода составляет около 700 строк. Основная функция метода состоит в том, чтобы судить, какие операции (такие как создание, обновление и т. д.) ресурс может выполнять через RESTStorage, построенный на предыдущем шаге, и сохранять соответствующие операции в действии Каждое действие соответствует стандартному остатку операция, такая как создание, соответствует Операция действия — POST, а операция действия, соответствующая обновлению — PUT. Наконец, он проходится по очереди в соответствии с массивом действий, и к каждой операции добавляется метод-обработчик, прописанный в маршруте, и маршрут прописывается в веб-сервисе, что идеально соответствует шаблону проектирования go-restful.
Добавление маршрута (начиная с apis)
Маршруты, начинающиеся с api, в основном предназначены для реализации маршрутизации основных ресурсов, а для других дополнительных ресурсов, таких как связанные с аутентификацией, сетью и другие расширенные ресурсы API, имена начинаются с apis, а запись реализацииInstallAPIs.
InstallAPIsиInstallLegacyAPIGroupОсновное отличие заключается в том, как получается RESTStorage. Для маршрутов в начале API все они в унифицированном формате /api/v1, для маршрутов в начале API он другой, содержит множество различных форматов (в коде Kubernetes называется groupName) , такие как /apis/apps , /apis/certificates.k8s.io и другие нестандартные имена групп. Для этого kubernetes предоставляетRESTStorageProviderИнтерфейс шаблона фабрики
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool)
}
Все маршрутизируемые ресурсы, начинающиеся с API, должны реализовать этот интерфейс. Метод GroupName() получает имена групп, аналогичные /apis/apps и /apis/certificates.k8s.io, а метод NewRESTStorage получает соответствующую информацию, инкапсулированную в RESTStorage. Реализация интерфейса NewRESTStorage различных ресурсов показана на рисунке:
Запуск на стороне сервера
пройти черезCreateServerChainПосле создания сервера продолжайте звонитьGenericAPIServerМетод Run завершает окончательную работу по запуску. сначала черезPrepareRunЭтот метод завершает отделочные работы по фрезерованию перед началом, и этот метод в основном завершаетSwaggerиOpenAPIРабота по оформлению маршрута(SwaggerиOpenAPIОн в основном содержит все детали и спецификации API Kubernetes и завершает регистрацию маршрута /healthz. После завершения начните окончательные работы по запуску сервера.
Runчерез методNonBlockingRunметод запуска безопасного http-сервера (небезопасный способ запуска вCreateServerChainспособ завершен)
// Run spawns the secure http server. It only returns if stopCh is closed
// or the secure port cannot be listened on initially.
// Run方法会创建一个安全的http server。只有在stopCh关闭或最初无法监听安全端口时返回
func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// NonBlockingRun创建一个安全的http server
err := s.NonBlockingRun(stopCh)
if err != nil {
return err
}
<-stopCh
// 接收到stopCh之后的处理动作
err = s.RunPreShutdownHooks()
if err != nil {
return err
}
// Wait for all requests to finish, which are bounded by the RequestTimeout variable.
s.HandlerChainWaitGroup.Wait()
return nil
}
Основная работа по запуску включает в себя настройку различных аутентификаций сертификатов, параметров времени, параметров размера пакетов и т. д., а затем вызовnet/httpМетод запуска библиотеки запускается, а код относительно лаконичен, поэтому он не указан один за другим.
Связанные с разрешением
Существует три основных механизма, связанные с разрешениями в апостревере, а именно в обычном использованномСертификация,Аутентификацияивходной контроль. Для apiserver он в основном предоставляет интерфейсы в стиле остальных, поэтому различные разрешения в конечном итоге сосредоточены на оценке разрешений интерфейса.
в основеkubeAPIServerConfigНапример, вCreateServerChainметод, вызовCreateKubeAPIServerConfigОсновная функция этого метода — создание конфигурации kubeAPIServer. Введите этот метод и вызовитеbuildGenericConfigСоздайте некоторую общую конфигурацию вNewConfigвниз, назадDefaultBuildHandlerChain, этот метод в основном используется для цепной оценки оставшегося интерфейса apiserver, широко известного какfilterОперация записывается первым и проанализирована позже.
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := genericapifilters.WithAuthorization(apiHandler, c.Authorization.Authorizer, c.Serializer)
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
failedHandler := genericapifilters.Unauthorized(c.Serializer, c.Authentication.SupportsBasicAuth)
failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)
handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericfilters.WithPanicRecovery(handler)
return handler
}
После того, как профиль создан, затем создайте работу, перейдите вCreateKubeAPIServerметод, в методе инициализации go-restful Container вы можете увидеть
handlerChainBuilderметод возвращаетDefaultBuildHandlerChainОбертка вокруг метода, переданного в качестве параметраNewAPIServerHandlerвнутри метода. ВходитьNewAPIServerHandlerМетоды, как показано ниже:
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
nonGoRestfulMux := mux.NewPathRecorderMux(name)
if notFoundHandler != nil {
nonGoRestfulMux.NotFoundHandler(notFoundHandler)
}
gorestfulContainer := restful.NewContainer()
gorestfulContainer.ServeMux = http.NewServeMux()
gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(s, panicReason, httpWriter)
})
gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
serviceErrorHandler(s, serviceErr, request, response)
})
director := director{
name: name,
goRestfulContainer: gorestfulContainer,
nonGoRestfulMux: nonGoRestfulMux,
}
return &APIServerHandler{
FullHandlerChain: handlerChainBuilder(director),
GoRestfulContainer: gorestfulContainer,
NonGoRestfulMux: nonGoRestfulMux,
Director: director,
}
}
конфигурация путем установкиdirectorпередается как параметр вhandlerChainBuilderВ методе обратного вызова завершите регистрацию обработчика gorestfulContainer. По сути, директор — это переменная, которая реализует http.Handler. Поэтому вся логика обработки заключается в том, чтобы передать директор типа http.Handler в качестве параметра в цепочкуfilterизDefaultBuildHandlerChainвнутри метода. пройти черезDefaultBuildHandlerChainза каждый шагfilterоперации, полный контроль разрешений и другие операции. как пройтиnet/httpреализация пакетаfilterфункцию, вы можете обратиться кэта статья. сделано аналогичноfilterПосле функции следует выполнить работу по запуску, включая проверку сертификата, аутентификацию TLS и другую работу, поэтому я не буду вдаваться в подробности. В основном смотреть наfilterизDefaultBuildHandlerChainМетод заключается в том, как обрабатывать операцию аутентификации интерфейса.
запуск RBAC
RBAC может быть самым важным и часто используемым в Kubernetes. существуетDefaultBuildHandlerChainметод, позвонивgenericapifilters.WithAuthorizationметод, который реализует разрешения на каждом интерфейсеfilterработать.WithAuthorizationМетоды, как показано ниже
func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
if a == nil {
klog.Warningf("Authorization is disabled")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
ae := request.AuditEventFrom(ctx)
attributes, err := GetAuthorizerAttributes(ctx)
if err != nil {
responsewriters.InternalError(w, req, err)
return
}
authorized, reason, err := a.Authorize(attributes)
// an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
if authorized == authorizer.DecisionAllow {
audit.LogAnnotation(ae, decisionAnnotationKey, decisionAllow)
audit.LogAnnotation(ae, reasonAnnotationKey, reason)
handler.ServeHTTP(w, req)
return
}
if err != nil {
audit.LogAnnotation(ae, reasonAnnotationKey, reasonError)
responsewriters.InternalError(w, req, err)
return
}
klog.V(4).Infof("Forbidden: %#v, Reason: %q", req.RequestURI, reason)
audit.LogAnnotation(ae, decisionAnnotationKey, decisionForbid)
audit.LogAnnotation(ae, reasonAnnotationKey, reason)
responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
})
}
1. звонокGetAuthorizerAttributesМетод получает различные значения атрибутов конфигурации;
2, звонитеAuthorizeМетод оценивает, переданы ли полномочия, и другой орган реализует свой интерфейс для выполнения задачи аутентификации;
3. Если аутентификация успешно пройдена, позвоните
handler.ServeHTTPметод перехода к следующему шагуfilterОперация; в противном случае верните сообщение об ошибке напрямую.Взяв в качестве примера RBAC,
Authorizeокончательный вызов методаVisitRulesForМетод реализует оценку разрешений, и метод находится вkubernetes/pkg/registry/rbac/validation/rule.goв файле.VisitRulesForОсновной код выглядит следующим образом
func (r *DefaultRuleResolver) VisitRulesFor(user user.Info, namespace string, visitor func(source fmt.Stringer, rule *rbacv1.PolicyRule, err error) bool) {
if clusterRoleBindings, err := r.clusterRoleBindingLister.ListClusterRoleBindings(); err != nil {
if !visitor(nil, nil, err) {
return
}
} else {
sourceDescriber := &clusterRoleBindingDescriber{}
for _, clusterRoleBinding := range clusterRoleBindings {
subjectIndex, applies := appliesTo(user, clusterRoleBinding.Subjects, "")
if !applies {
continue
}
rules, err := r.GetRoleReferenceRules(clusterRoleBinding.RoleRef, "")
if err != nil {
if !visitor(nil, nil, err) {
return
}
continue
}
sourceDescriber.binding = clusterRoleBinding
sourceDescriber.subject = &clusterRoleBinding.Subjects[subjectIndex]
for i := range rules {
if !visitor(sourceDescriber, &rules[i], nil) {
return
}
}
}
}
if len(namespace) > 0 {
if roleBindings, err := r.roleBindingLister.ListRoleBindings(namespace); err != nil {
if !visitor(nil, nil, err) {
return
}
} else {
sourceDescriber := &roleBindingDescriber{}
for _, roleBinding := range roleBindings {
subjectIndex, applies := appliesTo(user, roleBinding.Subjects, namespace)
if !applies {
continue
}
rules, err := r.GetRoleReferenceRules(roleBinding.RoleRef, namespace)
if err != nil {
if !visitor(nil, nil, err) {
return
}
continue
}
sourceDescriber.binding = roleBinding
sourceDescriber.subject = &roleBinding.Subjects[subjectIndex]
for i := range rules {
if !visitor(sourceDescriber, &rules[i], nil) {
return
}
}
}
}
}
}
Основная работа заключается вclusterRoleBindingа такжеroleBindingСудя по настроенным ресурсам относительно понятно, что в принципе совпадает с нашей идеей использования RBAC.
операции с базой данных
Взаимодействие между ApiServer и базой данных в основном относится к взаимодействию с etcd. Все компоненты Kubernetes напрямую не взаимодействуют с etcd, а завершают финальное размещение данных, запрашивая apiserver, а apiserver взаимодействует с etcd.
Как упоминалось в предыдущей реализации маршрутизации, внутренние данные, соответствующие обработчику, окончательно реализованному API-сервером,Storeструктура сохраняется. Вот пример маршрута, который начинается с API, вNewLegacyRESTStorageметод, черезNewRESTилиNewStorageХранилище, соответствующее различным ресурсам, будет создано дляendpointsНапример, сгенерированный метод выглядит следующим образом
func NewREST(optsGetter generic.RESTOptionsGetter) *REST {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Endpoints{} },
NewListFunc: func() runtime.Object { return &api.EndpointsList{} },
DefaultQualifiedResource: api.Resource("endpoints"),
CreateStrategy: endpoint.Strategy,
UpdateStrategy: endpoint.Strategy,
DeleteStrategy: endpoint.Strategy,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}
return &REST{store}
}
В основном см.CompleteWithOptionsметод, вCompleteWithOptionsметод, вызовите RESTOptionsGetRESTOptionsметод, который вызывается в свою очередьStorageWithCacher-->NewRawStorage-->CreateМетод создает внутреннее хранилище для окончательной зависимости:
// Create creates a storage backend based on given config.
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case "etcd2":
return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
Видно, что черезCreateМетод заключается в создании серверной версии etcd для etcd2 или etcd3 Текущая версия по умолчанию — etcd3.
После того, как вы закончите создание соответствующей памяти, следующим шагом является работа соответствующего обработчика метода обратного хранения и окончательной реализации связывания (обработчик для обработки данных, необходимых от окончательного блюда).
Помните, что есть более длинный методregisterResourceHandlers, используемый для обработки определенных маршрутов обработчиков. Вернемся к методу снова,
POSTНапример, метод соответствует операции Create.handlerВызов параметра в конечном итоге перейдет кcreateHandlerметод, вkubernetes/vendor/k8s.io/apiserver/pkg/endpoints/handlers/crete.goВниз. Основным шагом является вызов метода Createkubernetes/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.goвнизCreateметод. Этот метод в основном включаетBeforeCreate,Storage.Create,AfterCreateа такжеDecoratorи другие основные методы. соответствуетPOSTОперация, самый важный способStorage.Create. Поскольку в настоящее время используется в основном etcd3, метод реализации выглядит следующим образом.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
return errors.New("resourceVersion should not be set on objects to be created")
}
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
}
data, err := runtime.Encode(s.codec, obj)
if err != nil {
return err
}
key = path.Join(s.pathPrefix, key)
opts, err := s.ttlOpts(ctx, int64(ttl))
if err != nil {
return err
}
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
txnResp, err := s.client.KV.Txn(ctx).If(
notFound(key),
).Then(
clientv3.OpPut(key, string(newData), opts...),
).Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return storage.NewKeyExistsError(key, 0)
}
if out != nil {
putResp := txnResp.Responses[0].GetResponsePut()
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
}
return nil
}
Основные операции:
1. звонокEncodeсериализация методов;
2, звонитеpath.JoinКлюч разбора;
3, звонокTransformToStorageпреобразовать тип данных;
4. Вызовите клиентский метод, чтобы написать etcd.
На данный момент завершена привязка обработки обработчика и соответствующей операции базы данных etcd, то есть этапы работы всего бэкенда маршрутизации завершены.
Подробнее об операциях etcd см.эта статья.
Все вышеперечисленное является резюме личного обучения, пожалуйста, поправьте меня, если я ошибаюсь!