kube-apiserver源码分析
本文主要看一下apiserver的启动及三种server安装路由的流程。
kube-apiserver进程启动入口
//cmd/kube-apiserver/apiserver.go
import ("k8s.io/kubernetes/cmd/kube-apiserver/app"...
)func main() {rand.Seed(time.Now().UnixNano())pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)command := app.NewAPIServerCommand()logs.InitLogs()defer logs.FlushLogs()if err := command.Execute(); err != nil {os.Exit(1)}
}
使用cobra框架,最终调用Run
//cmd/kube-apiserver/app/server.go
//通过包导入的方式注册apiserver资源
import (//创建extensions全局注册表,并注册extensions资源到extensions全局注册表extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"//创建aggregator全局注册表,并注册aggregator资源到aggregator全局注册表aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"//创建kubeapiserver全局注册表"k8s.io/kubernetes/pkg/api/legacyscheme"//注册kubeapiserver资源到kubeapiserver全局注册表"k8s.io/kubernetes/pkg/controlplane"...)// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {cmd := &cobra.Command{RunE: func(cmd *cobra.Command, args []string) error {completedOptions, err := Complete(s)return Run(completedOptions, genericapiserver.SetupSignalHandler())}}
}
Run主流程,短短几行,但其内部的实现流程还是比较长的,接下来慢慢看
// Run runs the specified APIServer. This should never exit.
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {// To help debugging, immediately log versionklog.Infof("Version: %+v", version.Get())//创建server链。kube-apiserver进程会启动三种server,以链的形式对外提供http服务,后面会详细讲解server, err := CreateServerChain(completeOptions, stopCh)//启动server前的准备工作prepared, err := server.PrepareRun()//最后启动http server,开始监听接收客户端请求return prepared.Run(stopCh)
}
CreateServerChain
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {//根据命令行选项创建通用配置kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)//根据通用配置生成extensionserver需要的配置// If additional API servers are added, they should be gated.apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(kubeAPIServerConfig.ExtraConfig.ProxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig, kubeAPIServerConfig.GenericConfig.TracerProvider))//根据extensionserver配置创建extensionserver//因为extension server是chain上的最后一个server,需要传NewEmptyDelegate,表示不需要代理任何server,如果处理不了则返回404错误apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())//根据通用配置创建kubeapiserver//将extensionserver传给kubeapiserver,即kubeapiserver作为extensionserver的代理kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)//根据通用配置生成aggregatorserver需要的配置// aggregator comes last in the chainaggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, kubeAPIServerConfig.ExtraConfig.ProxyTransport, pluginInitializer)//根据aggregatorserver配置创建aggregatorserver//将kubeapiserver传递给aggregatorserver,即aggregatorserver作为kubeapiserver的代理aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)//最后只需要将aggregatorServer返回,因为aggregatorServer是server chain的第一个server,所有的//请求会先经过它,如果处理不了,再将请求转发给kubeapiserver,如果仍然处理不了,最后转发给extensionserverreturn aggregatorServer, nil
}
CreateKubeAPIServerConfig
根据命令行参数创建通用配置
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s completedServerRunOptions) (*controlplane.Config,aggregatorapiserver.ServiceResolver,[]admission.PluginInitializer,error,
) {genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)config := &controlplane.Config{GenericConfig: genericConfig,ExtraConfig: controlplane.ExtraConfig{...ServiceIPRange: s.PrimaryServiceClusterIPRange,APIServerServiceIP: s.APIServerServiceIP,SecondaryServiceIPRange: s.SecondaryServiceClusterIPRange,APIServerServicePort: 443,...}...return config, serviceResolver, pluginInitializers, nil
}// BuildGenericConfig takes the master server options and produces the genericapiserver.Config associated with it
func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport,
) (genericConfig *genericapiserver.Config,versionedInformers clientgoinformers.SharedInformerFactory,serviceResolver aggregatorapiserver.ServiceResolver,pluginInitializers []admission.PluginInitializer,admissionPostStartHook genericapiserver.PostStartHookFunc,storageFactory *serverstorage.DefaultStorageFactory,lastErr error,
) {genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)...//根据配置生成认证器// Authentication.ApplyTo requires already applied OpenAPIConfig and EgressSelector if presentif lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {return}//根据配置生成授权genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)if err != nil {lastErr = fmt.Errorf("invalid authorization config: %v", err)return}//根据配置生成准入控制err = s.Admission.ApplyTo(genericConfig,versionedInformers,kubeClientConfig,utilfeature.DefaultFeatureGate,pluginInitializers...)return
}
1. createAPIExtensionsServer
apiextensionsserver用于处理对CRD资源的CURD请求。
//cmd/kube-apiserver/app/apiextensions.go
func createAPIExtensionsServer(apiextensionsConfig *apiextensionsapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget) (*apiextensionsapiserver.CustomResourceDefinitions, error) {return apiextensionsConfig.Complete().New(delegateAPIServer)
}//k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go
// New returns a new instance of CustomResourceDefinitions from the given config.
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {//创建通用server,传递了server名字和代理对象,后面会单独解释这个函数genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)//crd结构体s := &CustomResourceDefinitions{GenericAPIServer: genericServer,}apiResourceConfig := c.GenericConfig.MergedResourceConfig//const GroupName = "apiextensions.k8s.io"//创建apiGroupInfoapiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)return APIGroupInfo{//k8s.io/apiserver/pkg/server/genericapiserver.go//从资源注册表中获取group对应的资源的所有版本(优先级高的在前面)PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group),VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},// TODO unhardcode this. It was hardcoded before, but we need to re-evaluateOptionsExternalVersion: &schema.GroupVersion{Version: "v1"},Scheme: scheme,ParameterCodec: parameterCodec,NegotiatedSerializer: codecs,}//var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {//storage表示底层存储接口,每种资源对应一种storage。//收到创建crd请求后,会调用storage接口将数据保存到etcdstorage := map[string]rest.Storage{}// customresourcedefinitionscustomResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)storage["customresourcedefinitions"] = customResourceDefinitionStoragestorage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage}//安装如下两个资源,暴露的restful路径如下// /apis/apiextensions.k8s.io/v1/customresourcedefinitions// /apis/apiextensions.k8s.io/v1/customresourcedefinitions/status//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {return nil, err}crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)//获取代理对象,如果为空,则设置默认notfounddelegateHandler := delegationTarget.UnprotectedHandler()if delegateHandler == nil {delegateHandler = http.NotFoundHandler()}versionDiscoveryHandler := &versionDiscoveryHandler{discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},delegate: delegateHandler,}groupDiscoveryHandler := &groupDiscoveryHandler{discovery: map[string]*discovery.APIGroupHandler{},delegate: delegateHandler,}establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())crdHandler, err := NewCustomResourceDefinitionHandler(versionDiscoveryHandler,groupDiscoveryHandler,s.Informers.Apiextensions().V1().CustomResourceDefinitions(),delegateHandler,...)//注册非gorestful路径//精确匹配/apis,即请求路径为/apis,则调用crdHandlers.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)//前缀匹配/apis/,即请求路径包含/apis/,则调用crdHandlers.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)...
}
NewREST
每种k8s资源都需要对外提供restful风格的资源存储服务API,而且必须实现下面的接口
type Storage interface {// New returns an empty object that can be used with Create and Update after request data has been put into it.// This object must be a pointer type for use with Codec.DecodeInto([]byte, runtime.Object)New() runtime.Object
}
REST实现了上面的New接口
//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
type REST struct {*genericregistry.Store
}//k8s.io/apiserver/pkg/registry/generic/registry/store.go
type Store struct {// NewFunc returns a new instance of the type this registry returns for a// GET of a single object, e.g.://// curl GET /apis/group/version/namespaces/my-ns/myresource/name-of-objectNewFunc func() runtime.Object...
}//Store 实现了New()接口
// New implements RESTStorage.New.
func (e *Store) New() runtime.Object {return e.NewFunc()
}
NewREST初始化结构体genericregistry.Store
//k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*REST, error) {...//结构体genericregistry.Store在k8s.io/apiserver/pkg/registry/generic/registry/store.gostore := &genericregistry.Store{NewFunc: func() runtime.Object { return &apiextensions.CustomResourceDefinition{} },NewListFunc: func() runtime.Object { return &apiextensions.CustomResourceDefinitionList{} },PredicateFunc: MatchCustomResourceDefinition,DefaultQualifiedResource: apiextensions.Resource("customresourcedefinitions"),CreateStrategy: strategy,UpdateStrategy: strategy,DeleteStrategy: strategy,ResetFieldsStrategy: strategy,// TODO: define table converter that exposes more than name/creation timestampTableConvertor: rest.NewDefaultTableConvertor(apiextensions.Resource("customresourcedefinitions")),}options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}if err := store.CompleteWithOptions(options); err != nil {return nil, err}return &REST{store}, nil
}
CompleteWithOptions实现了接口storage.Interface,用于和etcd集群交互,存储请求数据。
//k8s.io/apiserver/pkg/registry/generic/registry/store.go
// CompleteWithOptions updates the store with the provided options and
// defaults common fields.
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {//options.RESTOptions.GetRESTOptions 在k8s.io/apiserver/pkg/server/options/etcd.goopts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)storageConfig, err := f.StorageFactory.NewConfig(resource)ret := generic.RESTOptions{StorageConfig: storageConfig,Decorator: generic.UndecoratedStorage,DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,EnableGarbageCollection: f.Options.EnableGarbageCollection,ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),CountMetricPollPeriod: f.Options.StorageConfig.CountMetricPollPeriod,}if e.Storage.Storage == nil {e.Storage.Codec = opts.StorageConfig.Codecvar err error//opts.Decorator 为 generic.UndecoratedStoragee.Storage.Storage, e.DestroyFunc, err = opts.Decorator(opts.StorageConfig,prefix,keyFunc,e.NewFunc,e.NewListFunc,attrFunc,options.TriggerFunc,options.Indexers,)}
}
UndecoratedStorage调用栈,最后初始化了用于和etcd交互的client
//k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
UndecoratedStoragereturn NewRawStorage(config, newFunc)//k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.goreturn factory.Create(*config, newFunc)//k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.goreturn newETCD3Storage(c, newFunc)//k8s.io/apiserver/pkg/storage/etcd3/store.goreturn etcd3.New(client, c.Codec, newFunc, c.Prefix, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nilreturn newStore(c, codec, newFunc, prefix, transformer, pagingEnabled, leaseManagerConfig)func newStore(c *clientv3.Client, codec runtime.Codec, newFunc func() runtime.Object, prefix string, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store {versioner := APIObjectVersioner{}result := &store{client: c,codec: codec,versioner: versioner,transformer: transformer,pagingEnabled: pagingEnabled,// for compatibility with etcd2 impl.// no-op for default prefix of '/registry'.// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'pathPrefix: path.Join("/", prefix),watcher: newWatcher(c, codec, newFunc, versioner, transformer),leaseManager: newDefaultLeaseManager(c, leaseManagerConfig),}return result
}// Create implements storage.Interface.Create.
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {txnResp, err := s.client.KV.Txn(ctx).If(notFound(key),).Then(clientv3.OpPut(key, string(newData), opts...),).Commit()
}
2. kubeAPIServer
kubeapiserver用于处理对核心无分组和分组资源的CURD处理。这里重点看一下这些资源的注册过程。
//cmd/kube-apiserver/app/server.go
// CreateKubeAPIServer creates and wires a workable kube-apiserver
func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) {kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)return kubeAPIServer, nil
}//pkg/controlplane/instance.go
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)...m := &Instance{GenericAPIServer: s,ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,}// install legacy rest storageif c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{StorageFactory: c.ExtraConfig.StorageFactory,ProxyTransport: c.ExtraConfig.ProxyTransport,KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,EventTTL: c.ExtraConfig.EventTTL,ServiceIPRange: c.ExtraConfig.ServiceIPRange,SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,ExtendExpiration: c.ExtraConfig.ExtendExpiration,ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,APIAudiences: c.GenericConfig.Authentication.APIAudiences,}//2.1 安装核心无分组资源,暴露的resfful api格式如下:/api/v1/resource,比如pod,service资源if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {return nil, err}}restStorageProviders := []RESTStorageProvider{apiserverinternalrest.StorageProvider{},authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},autoscalingrest.RESTStorageProvider{},batchrest.RESTStorageProvider{},certificatesrest.RESTStorageProvider{},coordinationrest.RESTStorageProvider{},discoveryrest.StorageProvider{},networkingrest.RESTStorageProvider{},noderest.RESTStorageProvider{},policyrest.RESTStorageProvider{},rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},schedulingrest.RESTStorageProvider{},storagerest.RESTStorageProvider{},flowcontrolrest.RESTStorageProvider{},// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.// See https://github.com/kubernetes/kubernetes/issues/42392appsrest.StorageProvider{},admissionregistrationrest.RESTStorageProvider{},eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},}//2.2 安装分组资源,暴露的resfful api格式如下:/apis/group/version/resource,比如deployment资源if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {return nil, err}
}
2.1 InstallLegacyAPI
安装核心无分组资源
// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {//2.1.1legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)...//2.1.2//DefaultLegacyAPIPrefix = "/api"if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {return fmt.Errorf("error in registering group versions: %v", err)}return nil
}
2.1.1 NewLegacyRESTStorage
创建APIGroupInfo,并创建每种资源需要的storage,存放于VersionedResourcesStorageMap中。
//pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {apiGroupInfo := genericapiserver.APIGroupInfo{//从全局资源注册表legacyscheme中获取核心无分组资源的优先版本(对于核心无分组资源来说只有一个V1版本),“”“”表示无分组PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),//用于存放不同版本资源对应的storageVersionedResourcesStorageMap: map[string]map[string]rest.Storage{},Scheme: legacyscheme.Scheme,ParameterCodec: legacyscheme.ParameterCodec,NegotiatedSerializer: legacyscheme.Codecs,}podStorage, err := podstore.NewStorage(restOptionsGetter,nodeStorage.KubeletConnectionInfo,c.ProxyTransport,podDisruptionClient,)restStorageMap := map[string]rest.Storage{"pods": podStorage.Pod,...}//核心无分组资源只有v1版本apiGroupInfo.VersionedResourcesStorageMap["v1"] = restStorageMapreturn restStorage, apiGroupInfo, nil
}
2.1.2 InstallLegacyAPIGroup
//k8s.io/apiserver/pkg/server/genericapiserver.go
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {...//将资源安装到/api/路径下,后面会再次分析此函数if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {return err}...return nil
}
2.2 InstallAPIs
安装分组资源
//pkg/controlplane/instance.go
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {//因为这里安装的是分组资源,所以需要多个apiGroupsInfoapiGroupsInfo := []*genericapiserver.APIGroupInfo{}...//restStorageProviders是数组类型,每个元素表示一种分组资源for _, restStorageBuilder := range restStorageProviders {groupName := restStorageBuilder.GroupName()if !apiResourceConfigSource.AnyVersionForGroupEnabled(groupName) {klog.V(1).Infof("Skipping disabled API group %q.", groupName)continue}//2.2.1 针对每种分组,生成apiGroupInfoapiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)if err != nil {return fmt.Errorf("problem initializing API group %q : %v", groupName, err)}if !enabled {klog.Warningf("API group %q is not enabled, skipping.", groupName)continue}klog.V(1).Infof("Enabling API group %q.", groupName)apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)}//2.2.2安装使能的所有分组资源m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...)return nil
}
2.2.1 NewRESTStorage
NewRESTStorage是一个接口,每种分组资源都有其实现。
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {GroupName() stringNewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
}
这里以app分组为例说明
//pkg/registry/apps/rest/storage_app.go
// StorageProvider is a struct for apps REST storage.
type StorageProvider struct{}// NewRESTStorage returns APIGroupInfo object.
func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {//const GroupName = "apps"//创建apiGroupInfoapiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)// If you add a version here, be sure to add an entry in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go with specific priorities.// TODO refactor the plumbing to provide the information in the APIGroupInfoif apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)if err != nil {return genericapiserver.APIGroupInfo{}, false, err}apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap}return apiGroupInfo, true, nil
}
v1Storage
app分组包含deployments,daemonsets等资源,这里创建每种资源对应的storage
//pkg/registry/apps/rest/storage_app.go
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {storage := map[string]rest.Storage{}// deploymentsdeploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)if err != nil {return storage, err}storage["deployments"] = deploymentStorage.Deploymentstorage["deployments/status"] = deploymentStorage.Statusstorage["deployments/scale"] = deploymentStorage.Scale// statefulsetsstatefulSetStorage, err := statefulsetstore.NewStorage(restOptionsGetter)if err != nil {return storage, err}storage["statefulsets"] = statefulSetStorage.StatefulSetstorage["statefulsets/status"] = statefulSetStorage.Statusstorage["statefulsets/scale"] = statefulSetStorage.Scale// daemonsetsdaemonSetStorage, daemonSetStatusStorage, err := daemonsetstore.NewREST(restOptionsGetter)if err != nil {return storage, err}storage["daemonsets"] = daemonSetStoragestorage["daemonsets/status"] = daemonSetStatusStorage// replicasetsreplicaSetStorage, err := replicasetstore.NewStorage(restOptionsGetter)if err != nil {return storage, err}storage["replicasets"] = replicaSetStorage.ReplicaSetstorage["replicasets/status"] = replicaSetStorage.Statusstorage["replicasets/scale"] = replicaSetStorage.Scale// controllerrevisionshistoryStorage, err := controllerrevisionsstore.NewREST(restOptionsGetter)if err != nil {return storage, err}storage["controllerrevisions"] = historyStoragereturn storage, nil
}
2.2.2 InstallAPIGroups
//k8s.io/apiserver/pkg/server/genericapiserver.go
// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {...for _, apiGroupInfo := range apiGroupInfos {//安装路径到/apis//APIGroupPrefix = "/apis"s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels)...}return nil
}
3. createAggregatorServer
aggregatorserver是serverchain上最后添加的server,所有的请求都会先经过aggregatorserver处理。
//cmd/kube-apiserver/app/aggregator.go
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)...
}
NewWithDelegate
//k8s.io/kube-aggregator/pkg/apiserver/apiserver.go
// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {//创建通用servergenericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)//创建apiGroupInfo,其中又会创建资源需要的storageapiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))//安装路由,暴露的restful路径如下// /apis/apiregistration.k8s.io/v1/apiservices// /apis/apiregistration.k8s.io/v1/apiservices/status//InstallAPIGroup会调用到installAPIResources,后面会详细分析此函数s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo)
}
NewRESTStorage
//k8s.io/kube-aggregator/pkg/registry/apiservice/rest/storage_apiservice.go
// NewRESTStorage returns an APIGroupInfo object that will work against apiservice.
func NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, shouldServeBeta bool) genericapiserver.APIGroupInfo {//const GroupName = "apiregistration.k8s.io"apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, aggregatorscheme.Scheme, metav1.ParameterCodec, aggregatorscheme.Codecs)if apiResourceConfigSource.VersionEnabled(v1.SchemeGroupVersion) {storage := map[string]rest.Storage{}apiServiceREST := apiservicestorage.NewREST(aggregatorscheme.Scheme, restOptionsGetter)storage["apiservices"] = apiServiceRESTstorage["apiservices/status"] = apiservicestorage.NewStatusREST(aggregatorscheme.Scheme, apiServiceREST)apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage}return apiGroupInfo
}
NewREST
//k8s.io/kube-aggregator/pkg/registry/apiservice/etcd/etcd.go
// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST {strategy := apiservice.NewStrategy(scheme)store := &genericregistry.Store{NewFunc: func() runtime.Object { return &apiregistration.APIService{} },NewListFunc: func() runtime.Object { return &apiregistration.APIServiceList{} },PredicateFunc: apiservice.MatchAPIService,DefaultQualifiedResource: apiregistration.Resource("apiservices"),CreateStrategy: strategy,UpdateStrategy: strategy,DeleteStrategy: strategy,ResetFieldsStrategy: strategy,// TODO: define table converter that exposes more than name/creation timestampTableConvertor: rest.NewDefaultTableConvertor(apiregistration.Resource("apiservices")),}options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: apiservice.GetAttrs}if err := store.CompleteWithOptions(options); err != nil {panic(err) // TODO: Propagate error up}return &REST{store}
}
4. installAPIResources**
安装无分组资源和分组资源最后都会调用到installAPIResources,这里重点看一下此函数
//k8s.io/apiserver/pkg/server/genericapiserver.go
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {var resourceInfos []*storageversion.ResourceInfo//遍历此分组的所有版本for _, groupVersion := range apiGroupInfo.PrioritizedVersions {...//安装route到GoRestfulContainerr, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)resourceInfos = append(resourceInfos, r...)}...return nil
}// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)installer := &APIInstaller{group: g,prefix: prefix,minRequestTimeout: g.MinRequestTimeout,}apiResources, resourceInfos, ws, registrationErrors := installer.Install()...//将ws添加到containercontainer.Add(ws)return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {var apiResources []metav1.APIResourcevar resourceInfos []*storageversion.ResourceInfovar errors []errorws := a.newWebService()// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.paths := make([]string, len(a.group.Storage))var i int = 0for path := range a.group.Storage {paths[i] = pathi++}sort.Strings(paths)for _, path := range paths {apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)...}return apiResources, resourceInfos, ws, errors
}
registerResourceHandlers
将path对应的handler(storage提供)构造route,并将route添加到ws。这是一个很长的函数,只截取了部分代码展示大概流程
//k8s.io/apiserver/pkg/endpoints/install.go
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {admit := a.group.Admit...// what verbs are supported by the storage, used to know what verbs we support per path//根据golang的断言语法判断storage是否支持Creater等接口creater, isCreater := storage.(rest.Creater)namedCreater, isNamedCreater := storage.(rest.NamedCreater)...// Get the list of actions for the given scope.switch {case !namespaceScoped:// Handle non-namespace scoped resources like nodes.resourcePath := resourceresourceParams := paramsitemPath := resourcePath + "/{name}"nameParams := append(params, nameParam)proxyParams := append(nameParams, pathParam)suffix := ""...actions = appendIf(actions, action{"PUT", itemPath, nameParams, namer, false}, isUpdater)actions = appendIf(actions, action{"PATCH", itemPath, nameParams, namer, false}, isPatcher)actions = appendIf(actions, action{"DELETE", itemPath, nameParams, namer, false}, isGracefulDeleter)...}for _, action := range actions {switch action.Verb {...case "POST": // Create a resource.var handler restful.RouteFunctionif isNamedCreater {//创建handler,收到对应的请求时调用handlerhandler = restfulCreateNamedResource(namedCreater, reqScope, admit)} else {handler = restfulCreateResource(creater, reqScope, admit)}handler = utilwarning.AddWarningsHandler(handler, warnings)article := GetArticleForNoun(kind, " ")doc := "create" + article + kindif isSubresource {doc = "create " + subresource + " of" + article + kind}//将route和handler关联起来route := ws.POST(action.Path).To(handler).Doc(doc).Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).Returns(http.StatusOK, "OK", producedObject).// TODO: in some cases, the API may return a v1.Status instead of the versioned object// but currently go-restful can't handle multiple different objects being returned.Returns(http.StatusCreated, "Created", producedObject).Returns(http.StatusAccepted, "Accepted", producedObject).Reads(defaultVersionedObject).Writes(producedObject)routes = append(routes, route)...}for _, route := range routes {//将route添加到webservice中ws.Route(route)}}
kube-apiserver源码分析相关推荐
- kubernetes apiserver源码分析二之路由
apiserver的man函数在 k8s.io/kubernetes/cmd/kube-apiserver 目录. 但是大部分源码却在 k8s.io/apiserver 这个库里面. cmd 目录下的 ...
- Kube Controller Manager 源码分析
Kube Controller Manager 源码分析 Controller Manager 在k8s 集群中扮演着中心管理的角色,它负责Deployment, StatefulSet, Repli ...
- kubeadm源码分析(内含kubernetes离线包,三步安装)
k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高. 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm ...
- Kubernetes StatefulSet源码分析
2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com,Based on Kubernetes 1.9 摘要:Kube ...
- Docker源码分析(三):Docker Daemon启动
http://www.infoq.com/cn/articles/docker-source-code-analysis-part3 1 前言 Docker诞生以来,便引领了轻量级虚拟化容器领域的技术 ...
- Kubernetes监控之Heapster源码分析
源码版本 heapster version: release-1.2 简介 Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控和性能分析. 基本的功能及概念介绍可以回顾我之 ...
- Istio Pilot 源码分析(二)
张海东, 多点生活(成都)云原生开发工程师. 本篇主要介绍 Pilot 源码中的 ServiceEntryStore 及其推送 xDS 的流程. 本文为 Istio Pilot 源码分析系列的第二篇 ...
- Istio Pilot 源码分析(一)
张海东, 多点生活(成都)云原生开发工程师. Istio 作为目前 Servic Mesh 方案中的翘楚,吸引着越来越多的企业及开发者.越来越多的团队想将其应用于微服务的治理,但在实际落地时却因为不 ...
- Kubernetes Client-go Informer 源码分析
几乎所有的Controller manager 和CRD Controller 都会使用Client-go 的Informer 函数,这样通过Watch 或者Get List 可以获取对应的Objec ...
- kubeadm源码分析(kubernetes离线安装包,三步安装)
k8s离线安装包 三步安装,简单到难以置信 kubeadm源码分析 说句实在话,kubeadm的代码写的真心一般,质量不是很高. 几个关键点来先说一下kubeadm干的几个核心的事: kubeadm ...
最新文章
- spyder清除控制台命令
- 微信小程序 - 支持html空格(提示)
- vue中Axios的封装与API接口的管理详解
- pyqt5讲解2:QPushButton,QRadioButton,QCheckBox
- 先排列再排行 html,三国中猛将的权威排名 颜良为何排列第二
- Leetcode 1109.航班预定统计 差分
- JAVA入门级教学之(final关键字)
- 商业有规律,赚钱有方法,不要在盲目努力了
- EyeQ Ultra 芯片 面向自动驾驶
- 转:linux下挂载移动硬盘
- 计算机导论学习综合训练及其答案
- mysql 定时任务 日志_mysql定时备份任务
- Windows 11 Manager(win11优化大师)官方中文版V1.0.0 | windows11优化软件下载
- 山东大学机电信息学院计算机网络,山东大学机电与信息工程学院2021考研复试考核内容...
- 嵌入式系统框架----软件篇
- 微信公众号一次性订阅消息功能开发实践
- 支付宝飞行模式/转卡/转账/h5拉起支付
- 在世界球场一球成名:HMS 生态为游戏开发者送出的助攻
- CodeForces 757 E.Bash Plays with Functions(积性函数+dp)
- MATLAB国产替代软件出现,这些学校已解决被禁用问题