本文主要看一下apiserver的启动及三种server安装路由的流程。

kube-apiserver进程启动入口

//cmd/kube-apiserver/apiserver.go
import ("k8s.io/kubernetes/cmd/kube-apiserver/app"...
)func main() {rand.Seed(time.Now().UnixNano())pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)command := app.NewAPIServerCommand()logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {os.Exit(1)}
}

使用cobra框架,最终调用Run

//cmd/kube-apiserver/app/server.go
//通过包导入的方式注册apiserver资源
import (//创建extensions全局注册表,并注册extensions资源到extensions全局注册表extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"//创建aggregator全局注册表,并注册aggregator资源到aggregator全局注册表aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"//创建kubeapiserver全局注册表"k8s.io/kubernetes/pkg/api/legacyscheme"//注册kubeapiserver资源到kubeapiserver全局注册表"k8s.io/kubernetes/pkg/controlplane"...)// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {cmd := &cobra.Command{RunE: func(cmd *cobra.Command, args []string) error {completedOptions, err := Complete(s)return Run(completedOptions, genericapiserver.SetupSignalHandler())}}
}

Run主流程,短短几行,但其内部的实现流程还是比较长的,接下来慢慢看

// Run runs the specified APIServer.  This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {// To help debugging, immediately log versionklog.Infof("Version: %+v", version.Get())//创建server链。kube-apiserver进程会启动三种server,以链的形式对外提供http服务,后面会详细讲解server, err := CreateServerChain(completeOptions, stopCh)//启动server前的准备工作prepared, err := server.PrepareRun()//最后启动http server,开始监听接收客户端请求return prepared.Run(stopCh)
}

CreateServerChain

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {//根据命令行选项创建通用配置kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)//根据通用配置生成extensionserver需要的配置// If additional API servers are added, they should be gated.apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))//根据extensionserver配置创建extensionserver//因为extension server是chain上的最后一个server,需要传NewEmptyDelegate,表示不需要代理任何server,如果处理不了则返回404错误apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())//根据通用配置创建kubeapiserver//将extensionserver传给kubeapiserver,即kubeapiserver作为extensionserver的代理kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)//根据通用配置生成aggregatorserver需要的配置// aggregator comes last in the chainaggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)//根据aggregatorserver配置创建aggregatorserver//将kubeapiserver传递给aggregatorserver,即aggregatorserver作为kubeapiserver的代理aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)//最后只需要将aggregatorServer返回,因为aggregatorServer是server chain的第一个server,所有的//请求会先经过它,如果处理不了,再将请求转发给kubeapiserver,如果仍然处理不了,最后转发给extensionserverreturn aggregatorServer, nil
}

CreateKubeAPIServerConfig

根据命令行参数创建通用配置

// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s completedServerRunOptions) (*controlplane.Config,aggregatorapiserver.ServiceResolver,[]admission.PluginInitializer,error,
) {genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)config := &controlplane.Config{GenericConfig: genericConfig,ExtraConfig: controlplane.ExtraConfig{...ServiceIPRange:          s.PrimaryServiceClusterIPRange,APIServerServiceIP:      s.APIServerServiceIP,SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,APIServerServicePort: 443,...}...return config, serviceResolver, pluginInitializers, nil
}// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport,
) (genericConfig *genericapiserver.Config,versionedInformers clientgoinformers.SharedInformerFactory,serviceResolver aggregatorapiserver.ServiceResolver,pluginInitializers []admission.PluginInitializer,admissionPostStartHook genericapiserver.PostStartHookFunc,storageFactory *serverstorage.DefaultStorageFactory,lastErr error,
) {genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)...//根据配置生成认证器// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if presentif lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {return}//根据配置生成授权genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)if err != nil {lastErr = fmt.Errorf("invalid authorization config: %v", err)return}//根据配置生成准入控制err = s.Admission.ApplyTo(genericConfig,versionedInformers,kubeClientConfig,utilfeature.DefaultFeatureGate,pluginInitializers...)return
}

1. createAPIExtensionsServer

apiextensionsserver用于处理对CRD资源的CURD请求。

//cmd/kube-apiserver/app/apiextensions.go
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {return apiextensionsConfig.Complete().New(delegateAPIServer)
}//k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {//创建通用server,传递了server名字和代理对象,后面会单独解释这个函数genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)//crd结构体s := &CustomResourceDefinitions{GenericAPIServer: genericServer,}apiResourceConfig := c.GenericConfig.MergedResourceConfig//const GroupName = "apiextensions.k8s.io"//创建apiGroupInfoapiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)return APIGroupInfo{//k8s.io/apiserver/pkg/server/genericapiserver.go//从资源注册表中获取group对应的资源的所有版本(优先级高的在前面)PrioritizedVersions:          scheme.PrioritizedVersionsForGroup(group),VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},// TODO unhardcode this.  It was hardcoded before, but we need to re-evaluateOptionsExternalVersion: &schema.GroupVersion{Version: "v1"},Scheme:                 scheme,ParameterCodec:         parameterCodec,NegotiatedSerializer:   codecs,}//var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {//storage表示底层存储接口,每种资源对应一种storage。//收到创建crd请求后,会调用storage接口将数据保存到etcdstorage := map[string]rest.Storage{}// customresourcedefinitionscustomResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)storage["customresourcedefinitions"] = customResourceDefinitionStoragestorage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage}//安装如下两个资源,暴露的restful路径如下// /apis/apiextensions.k8s.io/v1/customresourcedefinitions// /apis/apiextensions.k8s.io/v1/customresourcedefinitions/status//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {return nil, err}crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)//获取代理对象,如果为空,则设置默认notfounddelegateHandler := delegationTarget.UnprotectedHandler()if delegateHandler == nil {delegateHandler = http.NotFoundHandler()}versionDiscoveryHandler := &versionDiscoveryHandler{discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},delegate:  delegateHandler,}groupDiscoveryHandler := &groupDiscoveryHandler{discovery: map[string]*discovery.APIGroupHandler{},delegate:  delegateHandler,}establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())crdHandler, err := NewCustomResourceDefinitionHandler(versionDiscoveryHandler,groupDiscoveryHandler,s.Informers.Apiextensions().V1().CustomResourceDefinitions(),delegateHandler,...)//注册非gorestful路径//精确匹配/apis,即请求路径为/apis,则调用crdHandlers.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)//前缀匹配/apis/,即请求路径包含/apis/,则调用crdHandlers.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)...
}

NewREST
每种k8s资源都需要对外提供restful风格的资源存储服务API,而且必须实现下面的接口

type Storage interface {// New returns an empty object that can be used with Create and Update after request data has been put into it.// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)New() runtime.Object
}

REST实现了上面的New接口

//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
type REST struct {*genericregistry.Store
}//k8s.io/apiserver/pkg/registry/generic/registry/store.go
type Store struct {// NewFunc returns a new instance of the type this registry returns for a// GET of a single object, e.g.://// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-objectNewFunc func() runtime.Object...
}//Store 实现了New()接口
// New implements RESTStorage.New.
func (e *Store) New() runtime.Object {return e.NewFunc()
}

NewREST初始化结构体genericregistry.Store

//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {...//结构体genericregistry.Store在k8s.io/apiserver/pkg/registry/generic/registry/store.gostore := &genericregistry.Store{NewFunc:                  func() runtime.Object { return &apiextensions.CustomResourceDefinition{} },NewListFunc:              func() runtime.Object { return &apiextensions.CustomResourceDefinitionList{} },PredicateFunc:            MatchCustomResourceDefinition,DefaultQualifiedResource: apiextensions.Resource("customresourcedefinitions"),CreateStrategy:      strategy,UpdateStrategy:      strategy,DeleteStrategy:      strategy,ResetFieldsStrategy: strategy,// TODO: define table converter that exposes more than name/creation timestampTableConvertor: rest.NewDefaultTableConvertor(apiextensions.Resource("customresourcedefinitions")),}options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}if err := store.CompleteWithOptions(options); err != nil {return nil, err}return &REST{store}, nil
}

CompleteWithOptions实现了接口storage.Interface,用于和etcd集群交互,存储请求数据。

//k8s.io/apiserver/pkg/registry/generic/registry/store.go
// CompleteWithOptions updates the store with the provided options and
// defaults common fields.
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {//options.RESTOptions.GetRESTOptions 在k8s.io/apiserver/pkg/server/options/etcd.goopts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)storageConfig, err := f.StorageFactory.NewConfig(resource)ret := generic.RESTOptions{StorageConfig:           storageConfig,Decorator:               generic.UndecoratedStorage,DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,EnableGarbageCollection: f.Options.EnableGarbageCollection,ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),CountMetricPollPeriod:   f.Options.StorageConfig.CountMetricPollPeriod,}if e.Storage.Storage == nil {e.Storage.Codec = opts.StorageConfig.Codecvar err error//opts.Decorator 为 generic.UndecoratedStoragee.Storage.Storage, e.DestroyFunc, err = opts.Decorator(opts.StorageConfig,prefix,keyFunc,e.NewFunc,e.NewListFunc,attrFunc,options.TriggerFunc,options.Indexers,)}
}

UndecoratedStorage调用栈,最后初始化了用于和etcd交互的client

//k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
UndecoratedStoragereturn NewRawStorage(config, newFunc)//k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.goreturn factory.Create(*config, newFunc)//k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.goreturn newETCD3Storage(c, newFunc)//k8s.io/apiserver/pkg/storage/etcd3/store.goreturn etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nilreturn newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig)func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {versioner := APIObjectVersioner{}result := &store{client:        c,codec:         codec,versioner:     versioner,transformer:   transformer,pagingEnabled: pagingEnabled,// for compatibility with etcd2 impl.// no-op for default prefix of '/registry'.// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'pathPrefix:   path.Join("/", prefix),watcher:      newWatcher(c, codec, newFunc, versioner, transformer),leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),}return result
}// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {txnResp, err := s.client.KV.Txn(ctx).If(notFound(key),).Then(clientv3.OpPut(key, string(newData), opts...),).Commit()
}

2. kubeAPIServer

kubeapiserver用于处理对核心无分组和分组资源的CURD处理。这里重点看一下这些资源的注册过程。

//cmd/kube-apiserver/app/server.go
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)return kubeAPIServer, nil
}//pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)...m := &Instance{GenericAPIServer:          s,ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,}// install legacy rest storageif c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{StorageFactory:              c.ExtraConfig.StorageFactory,ProxyTransport:              c.ExtraConfig.ProxyTransport,KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,EventTTL:                    c.ExtraConfig.EventTTL,ServiceIPRange:              c.ExtraConfig.ServiceIPRange,SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,ExtendExpiration:            c.ExtraConfig.ExtendExpiration,ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,APIAudiences:                c.GenericConfig.Authentication.APIAudiences,}//2.1 安装核心无分组资源,暴露的resfful api格式如下:/api/v1/resource,比如pod,service资源if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {return nil, err}}restStorageProviders := []RESTStorageProvider{apiserverinternalrest.StorageProvider{},authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},autoscalingrest.RESTStorageProvider{},batchrest.RESTStorageProvider{},certificatesrest.RESTStorageProvider{},coordinationrest.RESTStorageProvider{},discoveryrest.StorageProvider{},networkingrest.RESTStorageProvider{},noderest.RESTStorageProvider{},policyrest.RESTStorageProvider{},rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},schedulingrest.RESTStorageProvider{},storagerest.RESTStorageProvider{},flowcontrolrest.RESTStorageProvider{},// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.// See https://github.com/kubernetes/kubernetes/issues/42392appsrest.StorageProvider{},admissionregistrationrest.RESTStorageProvider{},eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},}//2.2 安装分组资源,暴露的resfful api格式如下:/apis/group/version/resource,比如deployment资源if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {return nil, err}
}

2.1 InstallLegacyAPI
安装核心无分组资源

// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {//2.1.1legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)...//2.1.2//DefaultLegacyAPIPrefix = "/api"if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {return fmt.Errorf("error in registering group versions: %v", err)}return nil
}

2.1.1 NewLegacyRESTStorage
创建APIGroupInfo,并创建每种资源需要的storage,存放于VersionedResourcesStorageMap中。

//pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {apiGroupInfo := genericapiserver.APIGroupInfo{//从全局资源注册表legacyscheme中获取核心无分组资源的优先版本(对于核心无分组资源来说只有一个V1版本),“”“”表示无分组PrioritizedVersions:          legacyscheme.Scheme.PrioritizedVersionsForGroup(""),//用于存放不同版本资源对应的storageVersionedResourcesStorageMap: map[string]map[string]rest.Storage{},Scheme:                       legacyscheme.Scheme,ParameterCodec:               legacyscheme.ParameterCodec,NegotiatedSerializer:         legacyscheme.Codecs,}podStorage, err := podstore.NewStorage(restOptionsGetter,nodeStorage.KubeletConnectionInfo,c.ProxyTransport,podDisruptionClient,)restStorageMap := map[string]rest.Storage{"pods":             podStorage.Pod,...}//核心无分组资源只有v1版本apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMapreturn restStorage, apiGroupInfo, nil
}

2.1.2 InstallLegacyAPIGroup

//k8s.io/apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {...//将资源安装到/api/路径下,后面会再次分析此函数if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {return err}...return nil
}

2.2 InstallAPIs
安装分组资源

//pkg/controlplane/instance.go
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {//因为这里安装的是分组资源,所以需要多个apiGroupsInfoapiGroupsInfo := []*genericapiserver.APIGroupInfo{}...//restStorageProviders是数组类型,每个元素表示一种分组资源for _, restStorageBuilder := range restStorageProviders {groupName := restStorageBuilder.GroupName()if !apiResourceConfigSource.AnyVersionForGroupEnabled(groupName) {klog.V(1).Infof("Skipping disabled API group %q.", groupName)continue}//2.2.1 针对每种分组,生成apiGroupInfoapiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)if err != nil {return fmt.Errorf("problem initializing API group %q : %v", groupName, err)}if !enabled {klog.Warningf("API group %q is not enabled, skipping.", groupName)continue}klog.V(1).Infof("Enabling API group %q.", groupName)apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)}//2.2.2安装使能的所有分组资源m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...)return nil
}

2.2.1 NewRESTStorage
NewRESTStorage是一个接口,每种分组资源都有其实现。

// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {GroupName() stringNewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
}

这里以app分组为例说明

//pkg/registry/apps/rest/storage_app.go
// StorageProvider is a struct for apps REST storage.
type StorageProvider struct{}// NewRESTStorage returns APIGroupInfo object.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {//const GroupName = "apps"//创建apiGroupInfoapiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)// If you add a version here, be sure to add an entry in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go with specific priorities.// TODO refactor the plumbing to provide the information in the APIGroupInfoif apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)if err != nil {return genericapiserver.APIGroupInfo{}, false, err}apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap}return apiGroupInfo, true, nil
}

v1Storage
app分组包含deployments,daemonsets等资源,这里创建每种资源对应的storage

//pkg/registry/apps/rest/storage_app.go
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {storage := map[string]rest.Storage{}// deploymentsdeploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)if err != nil {return storage, err}storage["deployments"] = deploymentStorage.Deploymentstorage["deployments/status"] = deploymentStorage.Statusstorage["deployments/scale"] = deploymentStorage.Scale// statefulsetsstatefulSetStorage, err := statefulsetstore.NewStorage(restOptionsGetter)if err != nil {return storage, err}storage["statefulsets"] = statefulSetStorage.StatefulSetstorage["statefulsets/status"] = statefulSetStorage.Statusstorage["statefulsets/scale"] = statefulSetStorage.Scale// daemonsetsdaemonSetStorage, daemonSetStatusStorage, err := daemonsetstore.NewREST(restOptionsGetter)if err != nil {return storage, err}storage["daemonsets"] = daemonSetStoragestorage["daemonsets/status"] = daemonSetStatusStorage// replicasetsreplicaSetStorage, err := replicasetstore.NewStorage(restOptionsGetter)if err != nil {return storage, err}storage["replicasets"] = replicaSetStorage.ReplicaSetstorage["replicasets/status"] = replicaSetStorage.Statusstorage["replicasets/scale"] = replicaSetStorage.Scale// controllerrevisionshistoryStorage, err := controllerrevisionsstore.NewREST(restOptionsGetter)if err != nil {return storage, err}storage["controllerrevisions"] = historyStoragereturn storage, nil
}

2.2.2 InstallAPIGroups

//k8s.io/apiserver/pkg/server/genericapiserver.go
// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {...for _, apiGroupInfo := range apiGroupInfos {//安装路径到/apis//APIGroupPrefix = "/apis"s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)...}return nil
}

3. createAggregatorServer

aggregatorserver是serverchain上最后添加的server,所有的请求都会先经过aggregatorserver处理。

//cmd/kube-apiserver/app/aggregator.go
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)...
}

NewWithDelegate

//k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {//创建通用servergenericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)//创建apiGroupInfo,其中又会创建资源需要的storageapiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))//安装路由,暴露的restful路径如下// /apis/apiregistration.k8s.io/v1/apiservices// /apis/apiregistration.k8s.io/v1/apiservices/status//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
}

NewRESTStorage

//k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go
// NewRESTStorage returns an APIGroupInfo object that will work against apiservice.
func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, shouldServeBeta bool) genericapiserver.APIGroupInfo {//const GroupName = "apiregistration.k8s.io"apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {storage := map[string]rest.Storage{}apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)storage["apiservices"] = apiServiceRESTstorage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage}return apiGroupInfo
}

NewREST

//k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST {strategy := apiservice.NewStrategy(scheme)store := &genericregistry.Store{NewFunc:                  func() runtime.Object { return &apiregistration.APIService{} },NewListFunc:              func() runtime.Object { return &apiregistration.APIServiceList{} },PredicateFunc:            apiservice.MatchAPIService,DefaultQualifiedResource: apiregistration.Resource("apiservices"),CreateStrategy:      strategy,UpdateStrategy:      strategy,DeleteStrategy:      strategy,ResetFieldsStrategy: strategy,// TODO: define table converter that exposes more than name/creation timestampTableConvertor: rest.NewDefaultTableConvertor(apiregistration.Resource("apiservices")),}options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: apiservice.GetAttrs}if err := store.CompleteWithOptions(options); err != nil {panic(err) // TODO: Propagate error up}return &REST{store}
}

4. installAPIResources**

安装无分组资源和分组资源最后都会调用到installAPIResources,这里重点看一下此函数

//k8s.io/apiserver/pkg/server/genericapiserver.go
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {var resourceInfos []*storageversion.ResourceInfo//遍历此分组的所有版本for _, groupVersion := range apiGroupInfo.PrioritizedVersions {...//安装route到GoRestfulContainerr, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)resourceInfos = append(resourceInfos, r...)}...return nil
}// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)installer := &APIInstaller{group:             g,prefix:            prefix,minRequestTimeout: g.MinRequestTimeout,}apiResources, resourceInfos, ws, registrationErrors := installer.Install()...//将ws添加到containercontainer.Add(ws)return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {var apiResources []metav1.APIResourcevar resourceInfos []*storageversion.ResourceInfovar errors []errorws := a.newWebService()// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.paths := make([]string, len(a.group.Storage))var i int = 0for path := range a.group.Storage {paths[i] = pathi++}sort.Strings(paths)for _, path := range paths {apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)...}return apiResources, resourceInfos, ws, errors
}

registerResourceHandlers
将path对应的handler(storage提供)构造route,并将route添加到ws。这是一个很长的函数,只截取了部分代码展示大概流程

//k8s.io/apiserver/pkg/endpoints/install.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {admit := a.group.Admit...// what verbs are supported by the storage, used to know what verbs we support per path//根据golang的断言语法判断storage是否支持Creater等接口creater, isCreater := storage.(rest.Creater)namedCreater, isNamedCreater := storage.(rest.NamedCreater)...// Get the list of actions for the given scope.switch {case !namespaceScoped:// Handle non-namespace scoped resources like nodes.resourcePath := resourceresourceParams := paramsitemPath := resourcePath + "/{name}"nameParams := append(params, nameParam)proxyParams := append(nameParams, pathParam)suffix := ""...actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)...}for _, action := range actions {switch action.Verb {...case "POST": // Create a resource.var handler restful.RouteFunctionif isNamedCreater {//创建handler,收到对应的请求时调用handlerhandler = restfulCreateNamedResource(namedCreater, reqScope, admit)} else {handler = restfulCreateResource(creater, reqScope, admit)}handler = utilwarning.AddWarningsHandler(handler, warnings)article := GetArticleForNoun(kind, " ")doc := "create" + article + kindif isSubresource {doc = "create " + subresource + " of" + article + kind}//将route和handler关联起来route := ws.POST(action.Path).To(handler).Doc(doc).Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).Returns(http.StatusOK, "OK", producedObject).// TODO: in some cases, the API may return a v1.Status instead of the versioned object// but currently go-restful can't handle multiple different objects being returned.Returns(http.StatusCreated, "Created", producedObject).Returns(http.StatusAccepted, "Accepted", producedObject).Reads(defaultVersionedObject).Writes(producedObject)routes = append(routes, route)...}for _, route := range routes {//将route添加到webservice中ws.Route(route)}}

kube-apiserver源码分析相关推荐

  1. kubernetes apiserver源码分析二之路由

    apiserver的man函数在 k8s.io/kubernetes/cmd/kube-apiserver 目录. 但是大部分源码却在 k8s.io/apiserver 这个库里面. cmd 目录下的 ...

  2. Kube Controller Manager 源码分析

    Kube Controller Manager 源码分析 Controller Manager 在k8s 集群中扮演着中心管理的角色,它负责Deployment, StatefulSet, Repli ...

  3. kubeadm源码分析(内含kubernetes离线包,三步安装)

    k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高. 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm ...

  4. Kubernetes StatefulSet源码分析

    2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com,Based on Kubernetes 1.9 摘要:Kube ...

  5. Docker源码分析(三):Docker Daemon启动

    http://www.infoq.com/cn/articles/docker-source-code-analysis-part3 1 前言 Docker诞生以来,便引领了轻量级虚拟化容器领域的技术 ...

  6. Kubernetes监控之Heapster源码分析

    源码版本 heapster version: release-1.2 简介 Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控和性能分析. 基本的功能及概念介绍可以回顾我之 ...

  7. Istio Pilot 源码分析(二)

    张海东, ‍多点生活(成都)云原生开发工程师. 本篇主要介绍 Pilot 源码中的 ServiceEntryStore 及其推送 xDS 的流程. 本文为 Istio Pilot 源码分析系列的第二篇 ...

  8. Istio Pilot 源码分析(一)

    张海东, ‍多点生活(成都)云原生开发工程师. Istio 作为目前 Servic Mesh 方案中的翘楚,吸引着越来越多的企业及开发者.越来越多的团队想将其应用于微服务的治理,但在实际落地时却因为不 ...

  9. Kubernetes Client-go Informer 源码分析

    几乎所有的Controller manager 和CRD Controller 都会使用Client-go 的Informer 函数,这样通过Watch 或者Get List 可以获取对应的Objec ...

  10. kubeadm源码分析(kubernetes离线安装包,三步安装)

    k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高. 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm ...

最新文章

  1. spyder清除控制台命令
  2. 微信小程序 - 支持html空格(提示)
  3. vue中Axios的封装与API接口的管理详解
  4. pyqt5讲解2:QPushButton,QRadioButton,QCheckBox
  5. 先排列再排行 html,三国中猛将的权威排名 颜良为何排列第二
  6. Leetcode 1109.航班预定统计 差分
  7. JAVA入门级教学之(final关键字)
  8. 商业有规律,赚钱有方法,不要在盲目努力了
  9. EyeQ Ultra 芯片 面向自动驾驶
  10. 转:linux下挂载移动硬盘
  11. 计算机导论学习综合训练及其答案
  12. mysql 定时任务 日志_mysql定时备份任务
  13. Windows 11 Manager(win11优化大师)官方中文版V1.0.0 | windows11优化软件下载
  14. 山东大学机电信息学院计算机网络,山东大学机电与信息工程学院2021考研复试考核内容...
  15. 嵌入式系统框架----软件篇
  16. 微信公众号一次性订阅消息功能开发实践
  17. 支付宝飞行模式/转卡/转账/h5拉起支付
  18. 在世界球场一球成名:HMS 生态为游戏开发者送出的助攻
  19. CodeForces 757 E.Bash Plays with Functions(积性函数+dp)
  20. MATLAB国产替代软件出现,这些学校已解决被禁用问题

热门文章

  1. 照片转3d模型_在线搭建3D场景
  2. 大数据常用shell脚本之kf脚本
  3. python 从函数中返回函数
  4. 如何通过几个简单的步骤编写一个漂亮的初级开发者简历
  5. 【立创EDA】快速入门与快捷功能
  6. android java 拷贝数据库文件到U盘,从U盘拷贝文件到私有目录下实现更新数据库文件
  7. Shopee虾皮网|打造Shopee爆款前,必须分析七大数据!
  8. cloudreve-自建云盘
  9. 浪潮服务器linux下升级固件,DELL戴尔PowerEdge服务器固件BIOS整体升级教程
  10. 按键精灵下载地址和学习地址