【k8s源码篇之Informer篇3】理解Informer中的Reflector组件
参考
- (三)Kubernetes 源码剖析之学习Informer机制
- 如何高效掌控K8s资源变化?K8s Informer实现机制浅析
- 25 | 深入解析声明式API(二):编写自定义控制器
- k8s client-go informer中的processorlistener数据消费,缓存的分析
架构
Informer 和 Controller
- 便于理解的架构图
- 这里 Indexer「索引」 和 Local Store 「缓存」 是分开表示的
- 在源码级别,基本上是一起实现的,一个结构体内涵盖
Informer 简要架构
- 源码级简要理解
Informer 详细架构
- 源码级详细理解
Reflector
// staging/src/k8s.io/client-go/tools/cache/shared_informer.gotype sharedIndexInformer struct {// 索引和 缓存 storeindexer Indexer// Informer 内部的 controller,不是我们自定义的 Controllercontroller Controller// 处理函数,将是重点processor *sharedProcessor// 检测 cache 是否有变化,一把用作调试,默认是关闭的cacheMutationDetector MutationDetector// 构造 Reflector 需要listerWatcher ListerWatcher// 目标类型,给 Reflector 判断资源类型objectType runtime.Object// Reflector 进行重新同步周期resyncCheckPeriod time.Duration// 如果使用者没有添加 Resync 时间,则使用这个默认的重新同步周期defaultEventHandlerResyncPeriod time.Durationclock clock.Clock// 两个 bool 表达了三个状态:controller 启动前、已启动、已停止started, stopped boolstartedLock sync.Mutex// 当 Pop 正在消费队列,此时新增的 listener 需要加锁,防止消费混乱blockDeltas sync.Mutex// Watch 返回 err 的回调函数watchErrorHandler WatchErrorHandler
}
内部总管家 controller
Controller 作为核心中枢,集成了组件 Reflector、DeltaFIFO
DeltaFIFO 的消费
HandleDeltas
成为连接下游消费者的桥梁- 用于更新索引Indexer和缓存 Loacl Store
- 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
Controller 由 controller 结构体进行具体实现 —— 这里的 controller 指的是 Informer 内部的控制器:
- 在 K8s 中约定俗成:大写定义的 interface 接口,由对应小写定义的结构体进行实现。
结构体定义如下:
// k8s.io/client-go/tools/cache/controller.go// 接口定义又哪些行为
// Controller is a generic controller framework.
type Controller interface {Run(stopCh <-chan struct{})HasSynced() boolLastSyncResourceVersion() string
}// controller 的具体实现
// Controller is a generic controller framework.
type controller struct {config Config // 包含着 ListAndWatch 函数,DeltaFIFOreflector *Reflector // Reflector , 用于 ListAndWatchreflectorMutex sync.RWMutexclock clock.Clock
}// Run 的时候,会创建 Reflector
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()// Reflector 的构建依赖于 Configr := NewReflector(c.config.ListerWatcher, // ListAndWatch 函数c.config.ObjectType,c.config.Queue, // Delta FIFOc.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = r // Reflectorc.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait()wg.StartWithChannel(stopCh, r.Run)// processLoop 就是 HandleDeltas 函数// 1. 用于更新索引Indexer和缓存 Loacl Store // 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用wait.Until(c.processLoop, time.Second, stopCh)
}// 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
// c.config.Process 就是 HandleDeltas 函数,在 config 初始化可以看到
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}
Controller 中以 goroutine 协程方式启动 Run 方法,会启动 Reflector 的 ListAndWatch(),用于从 apiserver 拉取全量和监听增量资源,存储到 DeltaFIFO。接着,启动 processLoop 不断从 DeltaFIFO Pop 进行消费。在 sharedIndexInformer 中 Pop 出来进行处理的函数是 HandleDeltas,一方面维护 Indexer 的 Add/Update/Delete,另一方面调用下游 sharedProcessor 进行 handler 处理。
连接下游的 HandleDeltas
DeltaFIFO 的消费 HandleDeltas
成为连接下游消费者的桥梁
- 用于更新索引Indexer和缓存 Loacl Store
- 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {// 资源的同步、添加、更新实践case Sync, Added, Updated:isSync := d.Type == Syncs.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {// 重点!!!更新缓存和索引if err := s.indexer.Update(d.Object); err != nil {return err}// 将事件分发,进行处理s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {// 重点!!!更新缓存和索引if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, isSync)}// 资源的删除时间case Deleted:// 重点!!!更新缓存和索引if err := s.indexer.Delete(d.Object); err != nil {return err}// 重点!!! 通知事件的到来,给订阅的 Informer 发送消息通知// 相应 Informer 的 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理// 然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}
controller 的配置管理 Config
- 此配置项基本上涵盖了 controller 创建所必须得
- Queue —— DeltaFIFO
- ListerWatcher —— Reflector 的 ListAndWatch
- Process —— 连接下游的 HandleDeltas(事件处理handler,同步缓存和索引)
// staging/src/k8s.io/client-go/tools/cache/controller.go
type Config struct {// 实际由 DeltaFIFO 实现Queue// 构造 Reflector 需要ListerWatcher// Pop 出来的 obj 处理函数 连接下游的 HandleDeltas 函数Process ProcessFunc// 目标对象类型ObjectType runtime.Object// 全量重新同步周期FullResyncPeriod time.Duration// 是否进行重新同步的判断函数ShouldResync ShouldResyncFunc// 如果为 true,Process() 函数返回 err,则再次入队 re-queueRetryOnError bool// Watch 返回 err 的回调函数WatchErrorHandler WatchErrorHandler// Watch 分页大小WatchListPageSize int64
}
controller 管理的 Reflector
Reflector 的主要职责是从 apiserver 拉取并持续监听(ListAndWatch) 相关资源类型的增删改(Add/Update/Delete)事件,存储在由 DeltaFIFO 实现的本地缓存(local Store) 中。
首先看一下 Reflector 结构体定义:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
type Reflector struct {// 通过 file:line 唯一标识的 namename string// 下面三个为了确认类型expectedTypeName stringexpectedType reflect.TypeexpectedGVK *schema.GroupVersionKind// 存储 interface: 具体由 DeltaFIFO 实现存储store Store// 用来从 apiserver 拉取全量和增量资源listerWatcher ListerWatcher// 下面两个用来做失败重试backoffManager wait.BackoffManagerinitConnBackoffManager wait.BackoffManager// informer 使用者重新同步的周期resyncPeriod time.Duration// 判断是否满足可以重新同步的条件ShouldResync func() boolclock clock.Clock// 是否要进行分页 ListpaginatedResult bool// 最后同步的资源版本号,以此为依据,watch 只会监听大于此值的资源lastSyncResourceVersion string// 最后同步的资源版本号是否可用isLastSyncResourceVersionUnavailable bool// 加把锁控制版本号lastSyncResourceVersionMutex sync.RWMutex// 每页大小WatchListPageSize int64// watch 失败回调 handlerwatchErrorHandler WatchErrorHandler
}
从结构体定义可以看到,通过指定目标资源类型进行 ListAndWatch,并可进行分页相关设置。第一次拉取全量资源(目标资源类型) 后通过 syncWith 函数全量替换(Replace) 到 DeltaFIFO queue/items 中,之后通过持续监听 Watch(目标资源类型) 增量事件,并去重更新到 DeltaFIFO queue/items 中,等待被消费。
watch 目标类型通过 Go reflect 反射实现如下:
// staging/src/k8s.io/client-go/tools/cache/reflector.go
// watchHandler watches w and keeps *resourceVersion up to date.
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {...if r.expectedType != nil {if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))continue}}if r.expectedGVK != nil {if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))continue}}...
}
- 通过反射确认目标资源类型,所以命名为 Reflector 还是比较贴切的;
- List/Watch 的目标资源类型在NewSharedIndexInformer.ListerWatcher 进行了确定,但 Watch 还会在 watchHandler 中再次比较一下目标类型;
controller 管理的 DeltaFIFO
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {// 读写锁、条件变量lock sync.RWMutexcond sync.Cond// kv 存储:objKey1->Deltas[obj1-Added, obj1-Updated...]items map[string]Deltas// 只存储所有 objKeysqueue []string// 是否已经填充:通过 Replace() 接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为truepopulated bool// 通过 Replace() 接口将第一批对象放入队列的数量initialPopulationCount int// keyFunc 用来从某个 obj 中获取其对应的 objKeykeyFunc KeyFunc// 已知对象,其实就是 IndexerknownObjects KeyListerGetter // 队列是否已经关闭closed bool// 以 Replaced 类型发送(为了兼容老版本的 Sync)emitDeltaTypeReplaced bool
}
DeltaType 可分为以下类型:
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
type DeltaType stringconst (Added DeltaType = "Added"Updated DeltaType = "Updated"Deleted DeltaType = "Deleted"Replaced DeltaType = "Replaced" // 第一次或重新同步Sync DeltaType = "Sync" // 老版本重新同步叫 Sync
)
通过上面的 Reflector 分析可以知道,DeltaFIFO 的职责是通过队列加锁处理(queueActionLocked)、去重(dedupDeltas)、存储在由 DeltaFIFO 实现的本地缓存(local Store) 中,包括 queue(仅存 objKeys) 和 items(存 objKeys 和对应的 Deltas 增量变化),并通过 Pop 不断消费,通过 Process(item) 处理相关逻辑。
Reflector 的 ListAndWatch
// 接口定义
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {Lister // 该接口 定义了 List 方法Watcher // 该接口 定义了 Watch 方法
}// Lister is any object that knows how to perform an initial list.
type Lister interface {// List should return a list type object; the Items field will be extracted, and the// ResourceVersion field will be used to start the watch in the right place.List(options metav1.ListOptions) (runtime.Object, error)
}// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {// Watch should begin a watch at the specified version.Watch(options metav1.ListOptions) (watch.Interface, error)
}// 接口的实现
// 接口的作用体 —— ListWatch struct
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {ListFunc ListFuncWatchFunc WatchFunc// DisableChunking requests no chunking for this list watcher.DisableChunking bool
}// 作用体 ListWatch struct 的构建
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {optionsModifier := func(options *metav1.ListOptions) {options.FieldSelector = fieldSelector.String()}return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {listFunc := func(options metav1.ListOptions) (runtime.Object, error) {optionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Do().Get()}watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {options.Watch = trueoptionsModifier(&options)return c.Get().Namespace(namespace).Resource(resource).VersionedParams(&options, metav1.ParameterCodec).Watch()}return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}// 方法的实现
// List a set of apiserver resources
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {if !lw.DisableChunking {return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)}return lw.ListFunc(options)
}// Watch a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {return lw.WatchFunc(options)
}
sharedIndexInformer 的 Run 函数
- 初始化 Config 包含(DeltaFIFO队列、ListAndWatch函数、HandleDeltas函数)
- 利用 Config 创建 controller
- controller 执行 Run 函数时,会利用 Config 创建 Reflector
// k8s.io/client-go/tools/cache/shared_informer.gofunc (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()// DeltaFIFOfifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)// 注意此处的 Configcfg := &Config{Queue: fifo, // DeltaFIFOListerWatcher: s.listerWatcher, // ListAndWatch 函数 ObjectType: s.objectType,FullResyncPeriod: s.resyncCheckPeriod,RetryOnError: false,ShouldResync: s.processor.shouldResync,// 重点!!!// 处理 Deltas 的函数,也就是 handler(调用注册的 AddFunc、UpdateFunc、DeleteFunc)// 同时负责同步 索引Indexer 和 缓存 Local StoreProcess: s.HandleDeltas, }// 使用 Config 创建 共享Informer 内部的 controllerfunc() {s.startedLock.Lock()defer s.startedLock.Unlock()// 创建 共享Informer 内部的 controllers.controller = New(cfg)s.controller.(*controller).clock = s.clocks.started = true}()// Separate stop channel because Processor should be stopped strictly after controllerprocessorStopCh := make(chan struct{})var wg wait.Groupdefer wg.Wait() // Wait for Processor to stopdefer close(processorStopCh) // Tell Processor to stopwg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)wg.StartWithChannel(processorStopCh, s.processor.run)defer func() {s.startedLock.Lock()defer s.startedLock.Unlock()s.stopped = true // Don't want any new listeners}()// Run 的时候会创建 Reflectors.controller.Run(stopCh)
}// k8s.io/client-go/tools/cache/controller.go// 接口定义又哪些行为
// Controller is a generic controller framework.
type Controller interface {Run(stopCh <-chan struct{})HasSynced() boolLastSyncResourceVersion() string
}// controller 的具体实现
// Controller is a generic controller framework.
type controller struct {config Config // 包含着 ListAndWatch 函数,DeltaFIFOreflector *Reflector // Reflector , 用于 ListAndWatchreflectorMutex sync.RWMutexclock clock.Clock
}// Run 的时候会创建 Reflector
func (c *controller) Run(stopCh <-chan struct{}) {defer utilruntime.HandleCrash()go func() {<-stopChc.config.Queue.Close()}()// Reflector 的构建依赖于 Configr := NewReflector(c.config.ListerWatcher, // ListAndWatch 函数c.config.ObjectType,c.config.Queue, // Delta FIFOc.config.FullResyncPeriod,)r.ShouldResync = c.config.ShouldResyncr.clock = c.clockc.reflectorMutex.Lock()c.reflector = r // Reflectorc.reflectorMutex.Unlock()var wg wait.Groupdefer wg.Wait()wg.StartWithChannel(stopCh, r.Run)// processLoop 就是 HandleDeltas 函数// 1. 用于更新索引Indexer和缓存 Loacl Store // 2. 用于 AddEventHandler 注册的回调函数(AddFunc、UpdateFunc、DeleteFunc)的过滤及处理,然后放入 workqueue,供用户自定义的 Controller (这里指的是用户的业务逻辑)消费使用wait.Until(c.processLoop, time.Second, stopCh)
}// 启动 processLoop 不断从 DeltaFIFO Pop 进行消费
// c.config.Process 就是 HandleDeltas 函数,在 config 初始化可以看到
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err != nil {if err == ErrFIFOClosed {return}if c.config.RetryOnError {// This is the safe way to re-enqueue.c.config.Queue.AddIfNotPresent(obj)}}}
}
【k8s源码篇之Informer篇3】理解Informer中的Reflector组件相关推荐
- 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习
第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...
- 第十四课 k8s源码学习和二次开发原理篇-调度器原理
第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...
- 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理
第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...
- 【k8s源码篇】k8s类型定义1之yaml与单体、list对象转换
API 类型定义 参考: 深入剖析kubernetes的API对象类型定义 K8s源码分析(2)-Resource Meta 在kubernetes里提供了非常多的API对象,它们被定义在k8s.io ...
- hadoop作业初始化过程详解(源码分析第三篇)
(一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...
- 详解linux下auto工具制作Makefile源码包(制作篇)
2019独角兽企业重金招聘Python工程师标准>>> 详解linux下auto工具制作Makefile源码包(制作篇) 水木杨 一. 概述 为了更好的制作configure ...
- Android源码解析(一)动画篇-- Animator属性动画系统
Android源码解析-动画篇 Android源码解析(一)动画篇-- Animator属性动画系统 Android源码解析(二)动画篇-- ObjectAnimator Android在3.0版本中 ...
- Markdown编辑器:纯前端演示(可接入项目、含源码下载) - 总结篇
可接入项目,提供全部代码下载. 通过本地html静态文件,演示效果. Editor.md是一款开源的.可嵌入的 Markdown 在线编辑器(组件),基于 CodeMirror.jQuery 和 Ma ...
- Soul网关源码阅读番外篇(一) HTTP参数请求错误
Soul网关源码阅读番外篇(一) HTTP参数请求错误 共同作者:石立 萧 * 简介 在Soul网关2.2.1版本源码阅读中,遇到了HTTP请求加上参数返回404的错误,此篇文章基于此进行探索 ...
- Kubernetes Node Controller源码分析之配置篇
2019独角兽企业重金招聘Python工程师标准>>> Author: xidianwangtao@gmail.com Kubernetes Node Controller源码分析之 ...
最新文章
- 在.NET2.0中解析Json和Xml
- 【Vegas原创】outlook发送时,报550 5.7.1 client does not have permissions to send as this sender解决方法...
- FreeRTOS临界区应用与总结
- 通讯录新建分组功能php,微信通讯录分组怎么设置
- 如何将hive与mysql连接_hive连接mysql配置
- mysql hint 简书_MySQL
- 安卓系统曝漏洞!有人可能正在用你的手机秘密拍照
- Linux常用命令大全(三)
- Java集合Set、Map、HashSet、HashMap、TreeSet、TreeMap等
- Spark算子---实战应用
- 挑战王者荣耀“绝悟” AI,会进化的职业选手太恐怖了!
- java 上传文件编码_java文件传输之文件编码和File类的使用
- 【SPSS】SPSS之主成分分析及因子分析
- postman使用教程(1)--发送post请求
- android课程设计体重测量仪,数字身高体重测量仪毕业设计样本.docx
- 路在何方 路在脚下 -- !!
- layuiadmin上手好难_滑步车比赛好拍吗?
- 双摄像头做slsm_刚刚考完!真实双机位复试经验帮你避雷!
- 学会使用Composer
- 医疗器械小程序或手机APP软件开发方案
热门文章
- 播音主持艺考培训:气息练习有多重要?
- CentOS7下安装Python3,超详细完整教程
- 自然语言处理(NLP)-第三方库(工具包):WordNet(在nltk.corpus下)【英文:同义词、反义词、蕴含关系、语义相似度】
- html路由路径,苹果cms默认路由规则路径
- Python 实现注意力机制
- 记tomcat8 读取mysql longblob类型文本 乱码问题(实则UTF-8与GBK 混淆)
- 使用elementui踩坑(实则手贱),组件库显示不出来,需要点击多次才会出现的bug
- keyframes动画效果
- 想知道次世代建模吗?
- Java Excel转换PDF