参考

  • (三)Kubernetes 源码剖析之学习Informer机制
  • 如何高效掌控K8s资源变化?K8s Informer实现机制浅析
  • 25 | 深入解析声明式API(二):编写自定义控制器
  • k8s client-go informer中的processorlistener数据消费,缓存的分析

架构

Informer 和 Controller

  • 便于理解的架构图
  • 这里 Indexer「索引」 和 Local Store 「缓存」 是分开表示的
    • 在源码级别,基本上是一起实现的,一个结构体内涵盖

Informer 简要架构

  • 源码级简要理解

Informer 详细架构

  • 源码级详细理解

Reflector

// staging/src/k8s.io/client-go/tools/cache/shared_informer.gotype sharedIndexInformer struct {// 索引和 缓存 storeindexer    Indexer// Informer 内部的 controller,不是我们自定义的 Controllercontroller Controller// 处理函数,将是重点processor *sharedProcessor// 检测 cache 是否有变化,一把用作调试,默认是关闭的cacheMutationDetector MutationDetector// 构造 Reflector 需要listerWatcher ListerWatcher// 目标类型,给 Reflector 判断资源类型objectType runtime.Object// Reflector 进行重新同步周期resyncCheckPeriod time.Duration// 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期defaultEventHandlerResyncPeriod time.Durationclock                           clock.Clock// 两个 bool 表达了三个状态:controller 启动前、已启动、已停止started, stopped boolstartedLock      sync.Mutex// 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱blockDeltas sync.Mutex// Watch 返回 err 的回调函数watchErrorHandler WatchErrorHandler
}

内部总管家 controller

  • Controller 作为核心中枢,集成了组件 Reflector、DeltaFIFO

  • DeltaFIFO 的消费 HandleDeltas 成为连接下游消费者的桥梁

    1. 用于更新索引Indexer和缓存 Loacl Store
    2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
  • Controller 由 controller 结构体进行具体实现 —— 这里的 controller 指的是 Informer 内部的控制器:

    • 在 K8s 中约定俗成:大写定义的 interface 接口,由对应小写定义的结构体进行实现。

结构体定义如下:

// k8s.io/client-go/tools/cache/controller.go// 接口定义又哪些行为
// Controller is a generic controller framework.
type Controller interface {Run(stopCh <-chan struct{})HasSynced() boolLastSyncResourceVersion() string
}// controller 的具体实现
// Controller is a generic controller framework.
type controller struct {config         Config           // 包含着 ListAndWatch 函数,DeltaFIFOreflector      *Reflector // Reflector , 用于 ListAndWatchreflectorMutex sync.RWMutexclock          clock.Clock
}// Run 的时候,会创建 Reflector
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()// Reflector 的构建依赖于 Configr := NewReflector(c.config.ListerWatcher, // ListAndWatch 函数c.config.ObjectType,c.config.Queue, // Delta FIFOc.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = r // Reflectorc.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait()wg.StartWithChannel(stopCh, r.Run)// processLoop 就是  HandleDeltas 函数// 1. 用于更新索引Indexer和缓存 Loacl Store // 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用wait.Until(c.processLoop, time.Second, stopCh)
}// 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
// c.config.Process 就是  HandleDeltas 函数,在 config 初始化可以看到
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}

Controller 中以 goroutine 协程方式启动 Run 方法,会启动 Reflector 的 ListAndWatch(),用于从 apiserver 拉取全量和监听增量资源,存储到 DeltaFIFO。接着,启动 processLoop 不断从 DeltaFIFO Pop 进行消费。在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas,一方面维护 Indexer 的 Add/Update/Delete,另一方面调用下游 sharedProcessor 进行 handler 处理。

连接下游的 HandleDeltas

DeltaFIFO 的消费 HandleDeltas 成为连接下游消费者的桥梁

  1. 用于更新索引Indexer和缓存 Loacl Store
  2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {// 资源的同步、添加、更新实践case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {// 重点!!!更新缓存和索引if err := s.indexer.Update(d.Object); err != nil {return err}// 将事件分发,进行处理s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {// 重点!!!更新缓存和索引if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}// 资源的删除时间case Deleted://  重点!!!更新缓存和索引if err := s.indexer.Delete(d.Object); err != nil {return err}//  重点!!! 通知事件的到来,给订阅的 Informer 发送消息通知// 相应 Informer 的 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理// 然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}

controller 的配置管理 Config

  • 此配置项基本上涵盖了 controller 创建所必须得

    1. Queue —— DeltaFIFO
    2. ListerWatcher —— Reflector 的 ListAndWatch
    3. Process —— 连接下游的 HandleDeltas(事件处理handler,同步缓存和索引)
// staging/src/k8s.io/client-go/tools/cache/controller.go
type Config struct {// 实际由 DeltaFIFO 实现Queue// 构造 Reflector 需要ListerWatcher// Pop 出来的 obj 处理函数   连接下游的 HandleDeltas 函数Process ProcessFunc// 目标对象类型ObjectType runtime.Object// 全量重新同步周期FullResyncPeriod time.Duration// 是否进行重新同步的判断函数ShouldResync ShouldResyncFunc// 如果为 true,Process() 函数返回 err,则再次入队 re-queueRetryOnError bool// Watch 返回 err 的回调函数WatchErrorHandler WatchErrorHandler// Watch 分页大小WatchListPageSize int64
}

controller 管理的 Reflector

Reflector 的主要职责是从 apiserver 拉取并持续监听(ListAndWatch) 相关资源类型的增删改(Add/Update/Delete)事件,存储在由 DeltaFIFO 实现的本地缓存(local Store) 中。

首先看一下 Reflector 结构体定义:

// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {// 通过 file:line 唯一标识的 namename string// 下面三个为了确认类型expectedTypeName stringexpectedType     reflect.TypeexpectedGVK      *schema.GroupVersionKind// 存储 interface: 具体由 DeltaFIFO 实现存储store Store// 用来从 apiserver 拉取全量和增量资源listerWatcher ListerWatcher// 下面两个用来做失败重试backoffManager         wait.BackoffManagerinitConnBackoffManager wait.BackoffManager// informer 使用者重新同步的周期resyncPeriod time.Duration// 判断是否满足可以重新同步的条件ShouldResync func() boolclock clock.Clock// 是否要进行分页 ListpaginatedResult bool// 最后同步的资源版本号,以此为依据,watch 只会监听大于此值的资源lastSyncResourceVersion string// 最后同步的资源版本号是否可用isLastSyncResourceVersionUnavailable bool// 加把锁控制版本号lastSyncResourceVersionMutex sync.RWMutex// 每页大小WatchListPageSize int64// watch 失败回调 handlerwatchErrorHandler WatchErrorHandler
}

从结构体定义可以看到,通过指定目标资源类型进行 ListAndWatch,并可进行分页相关设置。第一次拉取全量资源(目标资源类型) 后通过 syncWith 函数全量替换(Replace) 到 DeltaFIFO queue/items 中,之后通过持续监听 Watch(目标资源类型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消费。

watch 目标类型通过 Go reflect 反射实现如下:

// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {...if r.expectedType != nil {if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}}if r.expectedGVK != nil {if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))continue}}...
}
  • 通过反射确认目标资源类型,所以命名为 Reflector 还是比较贴切的;
  • List/Watch 的目标资源类型在NewSharedIndexInformer.ListerWatcher 进行了确定,但 Watch 还会在 watchHandler 中再次比较一下目标类型;

controller 管理的 DeltaFIFO

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {// 读写锁、条件变量lock sync.RWMutexcond sync.Cond// kv 存储:objKey1->Deltas[obj1-Added, obj1-Updated...]items map[string]Deltas// 只存储所有 objKeysqueue []string// 是否已经填充:通过 Replace() 接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为truepopulated bool// 通过 Replace() 接口将第一批对象放入队列的数量initialPopulationCount int// keyFunc 用来从某个 obj 中获取其对应的 objKeykeyFunc KeyFunc// 已知对象,其实就是 IndexerknownObjects KeyListerGetter // 队列是否已经关闭closed bool// 以 Replaced 类型发送(为了兼容老版本的 Sync)emitDeltaTypeReplaced bool
}

DeltaType 可分为以下类型:

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType stringconst (Added   DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"Replaced DeltaType = "Replaced" // 第一次或重新同步Sync DeltaType = "Sync" // 老版本重新同步叫 Sync
)

通过上面的 Reflector 分析可以知道,DeltaFIFO 的职责是通过队列加锁处理(queueActionLocked)、去重(dedupDeltas)、存储在由 DeltaFIFO 实现的本地缓存(local Store) 中,包括 queue(仅存 objKeys) 和 items(存 objKeys 和对应的 Deltas 增量变化),并通过 Pop 不断消费,通过 Process(item) 处理相关逻辑。

Reflector 的 ListAndWatch

// 接口定义
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {Lister  // 该接口 定义了 List 方法Watcher // 该接口 定义了 Watch 方法
}// Lister is any object that knows how to perform an initial list.
type Lister interface {// List should return a list type object; the Items field will be extracted, and the// ResourceVersion field will be used to start the watch in the right place.List(options metav1.ListOptions) (runtime.Object, error)
}// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {// Watch should begin a watch at the specified version.Watch(options metav1.ListOptions) (watch.Interface, error)
}// 接口的实现
// 接口的作用体 —— ListWatch struct
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)// ListWatch knows how to list and watch a set of apiserver resources.  It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {ListFunc  ListFuncWatchFunc WatchFunc// DisableChunking requests no chunking for this list watcher.DisableChunking bool
}// 作用体 ListWatch struct 的构建
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {optionsModifier := func(options *metav1.ListOptions) {options.FieldSelector = fieldSelector.String()}return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {listFunc := func(options metav1.ListOptions) (runtime.Object, error) {optionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Do().Get()}watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {options.Watch = trueoptionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Watch()}return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}// 方法的实现
// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {if !lw.DisableChunking {return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)}return lw.ListFunc(options)
}// Watch a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)
}

sharedIndexInformer 的 Run 函数

  1. 初始化 Config 包含(DeltaFIFO队列、ListAndWatch函数、HandleDeltas函数)
  2. 利用 Config 创建 controller
  3. controller 执行 Run 函数时,会利用 Config 创建 Reflector
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()// DeltaFIFOfifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)// 注意此处的 Configcfg := &Config{Queue:            fifo,                          // DeltaFIFOListerWatcher:    s.listerWatcher,  // ListAndWatch 函数  ObjectType:       s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError:     false,ShouldResync:     s.processor.shouldResync,// 重点!!!// 处理 Deltas 的函数,也就是 handler(调用注册的 AddFunc、UpdateFunc、DeleteFunc)// 同时负责同步 索引Indexer 和 缓存 Local StoreProcess: s.HandleDeltas,  }// 使用 Config 创建 共享Informer 内部的 controllerfunc() {s.startedLock.Lock()defer s.startedLock.Unlock()// 创建 共享Informer 内部的 controllers.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait()              // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stopwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()// Run 的时候会创建 Reflectors.controller.Run(stopCh)
}// k8s.io/client-go/tools/cache/controller.go// 接口定义又哪些行为
// Controller is a generic controller framework.
type Controller interface {Run(stopCh <-chan struct{})HasSynced() boolLastSyncResourceVersion() string
}// controller 的具体实现
// Controller is a generic controller framework.
type controller struct {config         Config           // 包含着 ListAndWatch 函数,DeltaFIFOreflector      *Reflector // Reflector , 用于 ListAndWatchreflectorMutex sync.RWMutexclock          clock.Clock
}// Run 的时候会创建 Reflector
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()// Reflector 的构建依赖于 Configr := NewReflector(c.config.ListerWatcher, // ListAndWatch 函数c.config.ObjectType,c.config.Queue, // Delta FIFOc.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = r // Reflectorc.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait()wg.StartWithChannel(stopCh, r.Run)// processLoop 就是  HandleDeltas 函数// 1. 用于更新索引Indexer和缓存 Loacl Store // 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用wait.Until(c.processLoop, time.Second, stopCh)
}// 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
// c.config.Process 就是  HandleDeltas 函数,在 config 初始化可以看到
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}

【k8s源码篇之Informer篇3】理解Informer中的Reflector组件相关推荐

  1. 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习

    第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...

  2. 第十四课 k8s源码学习和二次开发原理篇-调度器原理

    第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...

  3. 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理

    第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...

  4. 【k8s源码篇】k8s类型定义1之yaml与单体、list对象转换

    API 类型定义 参考: 深入剖析kubernetes的API对象类型定义 K8s源码分析(2)-Resource Meta 在kubernetes里提供了非常多的API对象,它们被定义在k8s.io ...

  5. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  6. 详解linux下auto工具制作Makefile源码包(制作篇)

    2019独角兽企业重金招聘Python工程师标准>>> 详解linux下auto工具制作Makefile源码包(制作篇) 水木杨 一.     概述 为了更好的制作configure ...

  7. Android源码解析(一)动画篇-- Animator属性动画系统

    Android源码解析-动画篇 Android源码解析(一)动画篇-- Animator属性动画系统 Android源码解析(二)动画篇-- ObjectAnimator Android在3.0版本中 ...

  8. Markdown编辑器:纯前端演示(可接入项目、含源码下载) - 总结篇

    可接入项目,提供全部代码下载. 通过本地html静态文件,演示效果. Editor.md是一款开源的.可嵌入的 Markdown 在线编辑器(组件),基于 CodeMirror.jQuery 和 Ma ...

  9. Soul网关源码阅读番外篇(一) HTTP参数请求错误

    Soul网关源码阅读番外篇(一) HTTP参数请求错误 共同作者:石立 萧 * 简介     在Soul网关2.2.1版本源码阅读中,遇到了HTTP请求加上参数返回404的错误,此篇文章基于此进行探索 ...

  10. Kubernetes Node Controller源码分析之配置篇

    2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com Kubernetes Node Controller源码分析之 ...

最新文章

  1. 在.NET2.0中解析Json和Xml
  2. 【Vegas原创】outlook发送时,报550 5.7.1 client does not have permissions to send as this sender解决方法...
  3. FreeRTOS临界区应用与总结
  4. 通讯录新建分组功能php,微信通讯录分组怎么设置
  5. 如何将hive与mysql连接_hive连接mysql配置
  6. mysql hint 简书_MySQL
  7. 安卓系统曝漏洞!有人可能正在用你的手机秘密拍照
  8. Linux常用命令大全(三)
  9. Java集合Set、Map、HashSet、HashMap、TreeSet、TreeMap等
  10. Spark算子---实战应用
  11. 挑战王者荣耀“绝悟” AI,会进化的职业选手太恐怖了!
  12. java 上传文件编码_java文件传输之文件编码和File类的使用
  13. 【SPSS】SPSS之主成分分析及因子分析
  14. postman使用教程(1)--发送post请求
  15. android课程设计体重测量仪,数字身高体重测量仪毕业设计样本.docx
  16. 路在何方 路在脚下 -- !!
  17. layuiadmin上手好难_滑步车比赛好拍吗?
  18. 双摄像头做slsm_刚刚考完!真实双机位复试经验帮你避雷!
  19. 学会使用Composer
  20. 医疗器械小程序或手机APP软件开发方案

热门文章

  1. 播音主持艺考培训:气息练习有多重要?
  2. CentOS7下安装Python3,超详细完整教程
  3. 自然语言处理(NLP)-第三方库(工具包):WordNet(在nltk.corpus下)【英文:同义词、反义词、蕴含关系、语义相似度】
  4. html路由路径,苹果cms默认路由规则路径
  5. Python 实现注意力机制
  6. 记tomcat8 读取mysql longblob类型文本 乱码问题(实则UTF-8与GBK 混淆)
  7. 使用elementui踩坑(实则手贱),组件库显示不出来,需要点击多次才会出现的bug
  8. keyframes动画效果
  9. 想知道次世代建模吗?
  10. Java Excel转换PDF