目录

摘要:

与k8s-apis建立连接过程:

核心处理的时序图:

具体的函数调用堆栈:

核心函数:

operator程序启动

manager:New

cluster::New

clienlt:NewDynamicRESTMapper

restmapper:NewDiscoveryRESTMapper

discovery:ServerGroupsAndResources


摘要:

operator程序在启动时与k8s-apis服务建立连接, 本文记录当operator启动时候如何建立与k8s-apis的交互关系

与k8s-apis建立连接过程:

核心处理的时序图:

具体的函数调用堆栈:

#0  k8s.io/client-go/rest.(*Request).tryThrottleWithInfo (r=0xc0001e1900, ctx=..., retryInfo=..., ~r2=...) at /root/zsl/work/helloworld/vendor/k8s.io/client-go/rest/request.go:593
#1  0x0000000000c04a05 in k8s.io/client-go/rest.(*Request).tryThrottle (r=0xc0001e1900, ctx=..., ~r1=...) at /root/zsl/work/helloworld/vendor/k8s.io/client-go/rest/request.go:610
#2  0x0000000000c081c7 in k8s.io/client-go/rest.(*Request).request (r=0xc0001e1900, ctx=..., fn={void (net/http.Request *, net/http.Response *)} 0xc000424408, ~r2=...)at /root/zsl/work/helloworld/vendor/k8s.io/client-go/rest/request.go:952
#3  0x0000000000c09197 in k8s.io/client-go/rest.(*Request).Do (r=0xc0001e1900, ctx=..., ~r1=...) at /root/zsl/work/helloworld/vendor/k8s.io/client-go/rest/request.go:1038
#4  0x00000000019560ad in k8s.io/client-go/discovery.(*DiscoveryClient).ServerGroups (d=0xc0006f44c0, apiGroupList=0x0, err=...)at /root/zsl/work/helloworld/vendor/k8s.io/client-go/discovery/discovery_client.go:161
#5  0x00000000019578f9 in k8s.io/client-go/discovery.ServerGroupsAndResources (d=..., ~r1=..., ~r2=..., ~r3=...) at /root/zsl/work/helloworld/vendor/k8s.io/client-go/discovery/discovery_client.go:258
#6  0x000000000195729e in k8s.io/client-go/discovery.(*DiscoveryClient).ServerGroupsAndResources.func1 (~r0=..., ~r1=..., ~r2=...)at /root/zsl/work/helloworld/vendor/k8s.io/client-go/discovery/discovery_client.go:223
#7  0x000000000195ad62 in k8s.io/client-go/discovery.withRetries (maxRetries=2, f={void ([]*k8s.io/apimachinery/pkg/apis/meta/v1.APIGroup *, []*k8s.io/apimachinery/pkg/apis/meta/v1.APIResourceList *, error *)} 0xc000424c20, ~r2=..., ~r3=..., ~r4=...)at /root/zsl/work/helloworld/vendor/k8s.io/client-go/discovery/discovery_client.go:452
#8  0x0000000001957099 in k8s.io/client-go/discovery.(*DiscoveryClient).ServerGroupsAndResources (d=0xc0006f44c0, ~r0=..., ~r1=..., ~r2=...)at /root/zsl/work/helloworld/vendor/k8s.io/client-go/discovery/discovery_client.go:222
#9  0x000000000195dc82 in k8s.io/client-go/restmapper.GetAPIGroupResources (cl=..., ~r1=..., ~r2=...) at /root/zsl/work/helloworld/vendor/k8s.io/client-go/restmapper/discovery.go:148
#10 0x00000000019601f1 in sigs.k8s.io/controller-runtime/pkg/client/apiutil.NewDynamicRESTMapper.func1 (~r0=..., ~r1=...)at /root/zsl/work/helloworld/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/dynamicrestmapper.go:85
#11 0x000000000196033e in sigs.k8s.io/controller-runtime/pkg/client/apiutil.(*dynamicRESTMapper).setStaticMapper (drm=0xc0006bebe0, ~r0=...)at /root/zsl/work/helloworld/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/dynamicrestmapper.go:118
#12 0x00000000019600b0 in sigs.k8s.io/controller-runtime/pkg/client/apiutil.NewDynamicRESTMapper (cfg=0xc0006ee480, opts=..., ~r2=..., ~r3=...)at /root/zsl/work/helloworld/vendor/sigs.k8s.io/controller-runtime/pkg/client/apiutil/dynamicrestmapper.go:98
#13 0x0000000001ad657a in sigs.k8s.io/controller-runtime/pkg/cluster.setOptionsDefaults.func1 (c=0xc0006ee480, ~r1=..., ~r2=...)at /root/zsl/work/helloworld/vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go:216
#14 0x0000000001ad4bff in sigs.k8s.io/controller-runtime/pkg/cluster.New (config=0xc0006ee480, opts=..., ~r2=..., ~r3=...)at /root/zsl/work/helloworld/vendor/sigs.k8s.io/controller-runtime/pkg/cluster/cluster.go:158
#15 0x0000000001b16d28 in sigs.k8s.io/controller-runtime/pkg/manager.New (config=0xc0006ee480, options=..., ~r2=..., ~r3=...)at /root/zsl/work/helloworld/vendor/sigs.k8s.io/controller-runtime/pkg/manager/manager.go:313
#16 0x0000000001c04a75 in main.main () at /root/zsl/work/helloworld/main.go:68

核心函数:

operator程序启动

func main() {var metricsAddr stringvar enableLeaderElection boolvar probeAddr stringflag.StringVar(&metricsAddr, "metrics-bind-address", ":9080", "The address the metric endpoint binds to.")flag.StringVar(&probeAddr, "health-probe-bind-address", ":9081", "The address the probe endpoint binds to.")flag.BoolVar(&enableLeaderElection, "leader-elect", false,"Enable leader election for controller manager. "+"Enabling this will ensure there is only one active controller manager.")opts := zap.Options{Development: true,}opts.BindFlags(flag.CommandLine)flag.Parse()ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme:                 scheme,MetricsBindAddress:     metricsAddr,Port:                   9443,HealthProbeBindAddress: probeAddr,LeaderElection:         enableLeaderElection,LeaderElectionID:       "b84bc1d2.com.bolingcavalry",})if err != nil {setupLog.Error(err, "unable to start manager")os.Exit(1)}if err = (&controllers.GuestbookReconciler{Client: mgr.GetClient(),Scheme: mgr.GetScheme(),}).SetupWithManager(mgr); err != nil {setupLog.Error(err, "unable to create controller", "controller", "Guestbook")os.Exit(1)}//+kubebuilder:scaffold:builderif err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {setupLog.Error(err, "unable to set up health check")os.Exit(1)}if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {setupLog.Error(err, "unable to set up ready check")os.Exit(1)}setupLog.Info("starting manager")if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {setupLog.Error(err, "problem running manager")os.Exit(1)}
}

manager:New

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {// Set default values for options fieldsoptions = setOptionsDefaults(options)cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {clusterOptions.Scheme = options.SchemeclusterOptions.MapperProvider = options.MapperProviderclusterOptions.Logger = options.LoggerclusterOptions.SyncPeriod = options.SyncPeriodclusterOptions.Namespace = options.NamespaceclusterOptions.NewCache = options.NewCacheclusterOptions.NewClient = options.NewClientclusterOptions.ClientDisableCacheFor = options.ClientDisableCacheForclusterOptions.DryRunClient = options.DryRunClientclusterOptions.EventBroadcaster = options.EventBroadcaster //nolint:staticcheck})if err != nil {return nil, err}// Create the recorder provider to inject event recorders for the components.// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific// to the particular controller that it's being injected into, rather than a generic one like is here.recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)if err != nil {return nil, err}// Create the resource lock to enable leader election)leaderConfig := options.LeaderElectionConfigif leaderConfig == nil {leaderConfig = rest.CopyConfig(config)}resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{LeaderElection:             options.LeaderElection,LeaderElectionResourceLock: options.LeaderElectionResourceLock,LeaderElectionID:           options.LeaderElectionID,LeaderElectionNamespace:    options.LeaderElectionNamespace,})if err != nil {return nil, err}// Create the metrics listener. This will throw an error if the metrics bind// address is invalid or already in use.metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)if err != nil {return nil, err}// By default we have no extra endpoints to expose on metrics http server.metricsExtraHandlers := make(map[string]http.Handler)// Create health probes listener. This will throw an error if the bind// address is invalid or already in use.healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)if err != nil {return nil, err}errChan := make(chan error)runnables := newRunnables(errChan)return &controllerManager{stopProcedureEngaged:          pointer.Int64(0),cluster:                       cluster,runnables:                     runnables,errChan:                       errChan,recorderProvider:              recorderProvider,resourceLock:                  resourceLock,metricsListener:               metricsListener,metricsExtraHandlers:          metricsExtraHandlers,controllerOptions:             options.Controller,logger:                        options.Logger,elected:                       make(chan struct{}),port:                          options.Port,host:                          options.Host,certDir:                       options.CertDir,webhookServer:                 options.WebhookServer,leaseDuration:                 *options.LeaseDuration,renewDeadline:                 *options.RenewDeadline,retryPeriod:                   *options.RetryPeriod,healthProbeListener:           healthProbeListener,readinessEndpointName:         options.ReadinessEndpointName,livenessEndpointName:          options.LivenessEndpointName,gracefulShutdownTimeout:       *options.GracefulShutdownTimeout,internalProceduresStop:        make(chan struct{}),leaderElectionStopped:         make(chan struct{}),leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,}, nil
}

cluster::New

// New constructs a brand new cluster.
func New(config *rest.Config, opts ...Option) (Cluster, error) {if config == nil {return nil, errors.New("must specify Config")}options := Options{}for _, opt := range opts {opt(&options)}options = setOptionsDefaults(options)// Create the mapper providermapper, err := options.MapperProvider(config)if err != nil {options.Logger.Error(err, "Failed to get API Group-Resources")return nil, err}// Create the cache for the cached read client and registering informerscache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})if err != nil {return nil, err}clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}apiReader, err := client.New(config, clientOptions)if err != nil {return nil, err}writeObj, err := options.NewClient(cache, config, clientOptions, options.ClientDisableCacheFor...)if err != nil {return nil, err}if options.DryRunClient {writeObj = client.NewDryRunClient(writeObj)}// Create the recorder provider to inject event recorders for the components.// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific// to the particular controller that it's being injected into, rather than a generic one like is here.recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)if err != nil {return nil, err}return &cluster{config:           config,scheme:           options.Scheme,cache:            cache,fieldIndexes:     cache,client:           writeObj,apiReader:        apiReader,recorderProvider: recorderProvider,mapper:           mapper,logger:           options.Logger,}, nil
}

clienlt:NewDynamicRESTMapper

// NewDynamicRESTMapper returns a dynamic RESTMapper for cfg. The dynamic
// RESTMapper dynamically discovers resource types at runtime. opts
// configure the RESTMapper.
func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {
    client, err := discovery.NewDiscoveryClientForConfig(cfg)
    if err != nil {
        return nil, err
    }
    drm := &dynamicRESTMapper{
        limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
        newMapper: func() (meta.RESTMapper, error) {
            groupResources, err := restmapper.GetAPIGroupResources(client)
            if err != nil {
                return nil, err
            }
            return restmapper.NewDiscoveryRESTMapper(groupResources), nil
        },
    }
    for _, opt := range opts {
        if err = opt(drm); err != nil {
            return nil, err
        }
    }
    if !drm.lazy {
        if err := drm.setStaticMapper(); err != nil {
            return nil, err
        }
    }
    return drm, nil
}

restmapper:NewDiscoveryRESTMapper

// NewDiscoveryRESTMapper returns a PriorityRESTMapper based on the discovered
// groups and resources passed in.
func NewDiscoveryRESTMapper(groupResources []*APIGroupResources) meta.RESTMapper {unionMapper := meta.MultiRESTMapper{}var groupPriority []string// /v1 is special.  It should always come firstresourcePriority := []schema.GroupVersionResource{{Group: "", Version: "v1", Resource: meta.AnyResource}}kindPriority := []schema.GroupVersionKind{{Group: "", Version: "v1", Kind: meta.AnyKind}}for _, group := range groupResources {groupPriority = append(groupPriority, group.Group.Name)// Make sure the preferred version comes firstif len(group.Group.PreferredVersion.Version) != 0 {preferred := group.Group.PreferredVersion.Versionif _, ok := group.VersionedResources[preferred]; ok {resourcePriority = append(resourcePriority, schema.GroupVersionResource{Group:    group.Group.Name,Version:  group.Group.PreferredVersion.Version,Resource: meta.AnyResource,})kindPriority = append(kindPriority, schema.GroupVersionKind{Group:   group.Group.Name,Version: group.Group.PreferredVersion.Version,Kind:    meta.AnyKind,})}}for _, discoveryVersion := range group.Group.Versions {resources, ok := group.VersionedResources[discoveryVersion.Version]if !ok {continue}// Add non-preferred versions after the preferred version, in case there are resources that only exist in those versionsif discoveryVersion.Version != group.Group.PreferredVersion.Version {resourcePriority = append(resourcePriority, schema.GroupVersionResource{Group:    group.Group.Name,Version:  discoveryVersion.Version,Resource: meta.AnyResource,})kindPriority = append(kindPriority, schema.GroupVersionKind{Group:   group.Group.Name,Version: discoveryVersion.Version,Kind:    meta.AnyKind,})}gv := schema.GroupVersion{Group: group.Group.Name, Version: discoveryVersion.Version}versionMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{gv})for _, resource := range resources {scope := meta.RESTScopeNamespaceif !resource.Namespaced {scope = meta.RESTScopeRoot}// if we have a slash, then this is a subresource and we shouldn't create mappings for those.if strings.Contains(resource.Name, "/") {continue}plural := gv.WithResource(resource.Name)singular := gv.WithResource(resource.SingularName)// this is for legacy resources and servers which don't list singular forms.  For those we must still guess.if len(resource.SingularName) == 0 {_, singular = meta.UnsafeGuessKindToResource(gv.WithKind(resource.Kind))}versionMapper.AddSpecific(gv.WithKind(strings.ToLower(resource.Kind)), plural, singular, scope)versionMapper.AddSpecific(gv.WithKind(resource.Kind), plural, singular, scope)// TODO this is producing unsafe guesses that don't actually work, but it matches previous behaviorversionMapper.Add(gv.WithKind(resource.Kind+"List"), scope)}// TODO why is this type not in discovery (at least for "v1")versionMapper.Add(gv.WithKind("List"), meta.RESTScopeRoot)unionMapper = append(unionMapper, versionMapper)}}for _, group := range groupPriority {resourcePriority = append(resourcePriority, schema.GroupVersionResource{Group:    group,Version:  meta.AnyVersion,Resource: meta.AnyResource,})kindPriority = append(kindPriority, schema.GroupVersionKind{Group:   group,Version: meta.AnyVersion,Kind:    meta.AnyKind,})}return meta.PriorityRESTMapper{Delegate:         unionMapper,ResourcePriority: resourcePriority,KindPriority:     kindPriority,}
}

discovery:ServerGroupsAndResources

func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {sgs, err := d.ServerGroups()if sgs == nil {return nil, nil, err}resultGroups := []*metav1.APIGroup{}for i := range sgs.Groups {resultGroups = append(resultGroups, &sgs.Groups[i])}groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs)// order results by group/version discovery orderresult := []*metav1.APIResourceList{}for _, apiGroup := range sgs.Groups {for _, version := range apiGroup.Versions {gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}if resources, ok := groupVersionResources[gv]; ok {result = append(result, resources)}}}if len(failedGroups) == 0 {return resultGroups, result, nil}return resultGroups, result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}

2022-03-11 operator程序启动时连接kube-apis过程相关推荐

  1. linux c 启动程序吗,Linux下C程序启动时的系统调用

    写程序跟踪发现,在Linux i386中,一个程序体完全为空的C语言程序启动时要进行近100个系统调用,如下所示. [ 1]syscall: 11 //execve [ 2]syscall: 45 / ...

  2. 如何在ASP.NET Core程序启动时运行异步任务(3)

    原文:Running async tasks on app startup in ASP.NET Core (Part 3) 作者:Andrew Lock 译者:Lamond Lu 之前我写了两篇有关 ...

  3. linux 查看进程变量,Linux下查看进程(程序)启动时的环境变量

    Linux下查看进程(程序)启动时的环境变量 Linux的pargs ==================================== 今天又遇到一个老问题: 同事遇到了sqlplus &qu ...

  4. WinForm程序启动时不显示主窗体的实现方法

    望程序启动时不显示主窗体,而只是在SystemTray显示一个图标:当用户点击该图标时,才第一次显示出主窗体来. 作者在文章中已经说得很清楚,将Form的Visible属性设置为false是不行的,因 ...

  5. VC++ 实现VC程序启动时最小化到任务栏(完美解决闪烁问题)

    VC++ 实现VC程序启动时最小化到任务栏(完美解决闪烁问题) 参考文章: (1)VC++ 实现VC程序启动时最小化到任务栏(完美解决闪烁问题) (2)https://www.cnblogs.com/ ...

  6. 如何在ASP.NET Core程序启动时运行异步任务(2)

    原文:Running async tasks on app startup in ASP.NET Core (Part 2) 作者:Andrew Lock 译者:Lamond Lu 在我的上一篇博客中 ...

  7. 如何在ASP.NET Core程序启动时运行异步任务(1)

    原文:Running async tasks on app startup in ASP.NET Core (Part 1) 作者:Andrew Lock 译者:Lamond Lu 背景 当我们做项目 ...

  8. Android 解决程序启动时的黑屏问题

    关于黑屏默认的情况下,程序启动时,会有一个黑屏的时期,原因是,首个activity会加载一些数据,比如初始化列表数据.向服务器发送请求获取数据等等. 去除方法: 1.在style里面添加一个style ...

  9. 程序启动时,vc2015设置哪个窗体先打开,优先启动,设置方法

    程序启动时,vc2015设置哪个窗体先打开,设置方法 工程名对应的APP文件中 InitInstance中 //Caccess_mdb_operationDlg dlg;  //可以设置这里,首先启动 ...

最新文章

  1. python读取txt文件并写入excel-Python读取txt内容写入xls格式excel中的方法
  2. 如何使用Hadoop的JobControl
  3. 电脑中的php怎么删除文件夹,php中删除文件夹以及文件夹中的文件的方法
  4. Winform中设置ZedGraph的X轴与Y轴的刻度不在对面显示
  5. win7蓝屏_Win7大面积蓝屏?急!解决办法在这儿~
  6. Android 开源项目android-open-project工具库解析之(一) 依赖注入,图片缓存,网络相关,数据库orm工具包,Android公共库...
  7. DCT(离散余弦变换(DiscreteCosineTransform))
  8. CentOS6.5 搭建 LNMP (linux + nginx + mysql + php)
  9. linux block的含义,Block Prefetching含义
  10. 联想y470上三代cpu_AMD三代线程撕裂者首测 单核不再是问题(二)
  11. HDUOJ--汉诺塔II
  12. Unable to start a VM due to insufficient capacity
  13. java多线程编程书籍-线程、多线程、Java平台实现
  14. 【 软路由 】基于koolshare固件的软路由安装
  15. 如何压缩PDF文件、图片转PDF、PDF合并拆分!!!!
  16. 工序排班问题数学模型
  17. 软件测试面试之逻辑篇(一)
  18. [ZT]毁人不倦的应试教育(2)
  19. 阿里云企业版云服务器使用流程
  20. SkyWalking 极简入门

热门文章

  1. 此计算机上未运行虚拟机管理服务器,Vmware虚拟机教程之解决服务不能启动的问题...
  2. linux设备acpi表配置,linux ACPI 知识
  3. aul软件oracle,AUL/MyDUL修复Oracle
  4. 河南郑州与华为达成战略合作 共同推进云产业发展
  5. 文心千帆:PPT 制作、数字人主播一键开播等数十种应用场景惊艳到我了,下面给出简介和使用指南,快去使用起来吧
  6. DropDownList1_SelectedIndexChanged-的使用
  7. 微程序控制器和硬布线控制器的比较:
  8. SSM框架网上水果超市的设计与实现+文档
  9. 一图读懂丨一站式采购数字化云平台,引领企业数字化转型
  10. [转]Show Stopper 一次 crash 调试的夺命狂奔