介绍

MCP是基于订阅的配置分发API。配置使用者(即sink)从配置生产者(即source)请求更新资源集合.添加,更新或删除资源时,source会将资源更新推送到sink.sink积极确认资源更新,如果sink接受,则返回ACK,如果被拒绝则返回NACK,例如: 因为资源无效。一旦对先前的更新进行了ACK/NACK,则源可以推送其他更新.该源一次只能运行一个未完成的更新(每个集合).

MCP是一对双向流gRPC API服务(ResourceSource和ResourceSink)。

  • 当source是服务器而sink是客户端时,将使用ResourceSource服务.默认情况下,Galley实现ResourceSource服务,并且Pilot/Mixer连接作为客户端。

  • 当source是客户端,而接收器是服务器时,将使用ResourceSink服务.可以将Galley配置为可选地dial-out到远程配置sink,例如 Pilot位于另一个群集中,在该群集中,它不能作为客户端启动与Galley的连接.在这种情况下,Pilot将实现ResourceSink服务,而Galley将作为客户端进行连接。

就消息交换而言,ResourceSource和ResourceSink在语义上是等效的.唯一有意义的区别是谁启动连接并打开grpc流。

数据模型

MCP是一种传输机制,可以通过管理器组件配置先导和混合器.MCP定义了每种资源的通用元数据格式,而资源特定的内容则在其他位置定义(例如https://github.com/istio/api/tree/master/networking/v1alpha3).

Collections

相同类型的资源被组织到命名集合中.Istio API集合名称的格式为istio///,其中,和<api由API样式准则定义.例如,VirtualService的collection名称为istio/networking/v1alpha3/virtualservices。

元数据

建立连接

ResourceSource服务-客户端是reource sink.客户端dail服务器并建立新的gRPC流.客户端发送RequestResources并接收Resources消息。

ResourceSink服务-客户端是资源源.客户端拨打服务器并建立新的gRPC流.服务器发送RequestResources并接收Resources消息。

配置升级

以下概述适用于ResourceSink和ResourceSource服务,无论客户端/服务器角色如何。

资源更新协议由增量xDS协议派生。除了资源提示已被删除之外,协议交换几乎是相同的。下面的大多数文本和图表都是从增量xDS文档中复制并进行相应调整的。

在MCP中,资源首先按collection进行组织。在每个collection中,资源可以通过元数据名称唯一地标识。对各个资源进行版本控制,以区分同一命名资源的较新版本。

可以在两种情况下发送RequestResource消息:

  • MCP双向更改流中的初始消息

  • 作为对先前资源消息的ACK或NACK响应。在这种情况下,response_nonce设置为资源消息中的现时值.ACK/NACK由后续请求中是否存在error_detail决定。

初始的RequestResources消息包括与所订阅的资源集(例如VirtualService)相对应的集合,节点接收器标识符和nonce字段以及initial_resource_version(稍后会详细介绍)。当请求的资源可用时,source将发送资源消息。处理资源消息后,sink在流上发送新的RequestResources消息,指定成功应用的最后一个版本以及源提供的随机数。

随机数字段用于将每个集合的RequestResources和Resources消息配对。源一次只能发送一个未完成的资源消息(每个collection),并等待接收器进行ACK/NACK。接收到更新后,接收器将在解码,验证更新并将更新持久保存到其内部配置存储后,期望相对较快地发送ACK/NACK。

source应忽略具有过期和未知随机数的请求,这些请求与最近发送的Resource消息中的随机数不匹配。

成功示例

以下示例显示接收器接收到已成功ACK的一系列更改。

以下示例显示了与增量更新一起交付的所需资源.此示例假定source支持增量.当source不支持增量更新时,考虑到接收器是否请求增量更新,推送的资源将始终将增量设置为false.在任何时候,源都可以决定推送完整状态更新,而不必考虑接收器的请求.双方必须协商(即同意)在每个请求/响应的基础上使用增量,以增量发送更新。

错误示例

以下示例显示了无法应用更改时发生的情况

接收器仅在特殊情况下应为NACK。例如,如果一组资源无效,格式错误或无法解码。NACK的更新应发出警报,以供人类随后进行调查.源不应该重新发送先前NACK相同的资源集.在将金丝雀推送到更大数量的资源接收器之前,也可以将金丝雀推送到专用接收器,以验证正确性(非NACK)。

MCP中的随机数用于匹配RequestResources和Resource。在重新连接时,接收器可以通过为每个集合指定带有initial_resource_version的已知资源版本来尝试恢复与同一源的会话。

mcp实现探

接下来以官方的测试用例分析官方的mcp实现,代码地址:https://github.com/istio/istio/blob/master/pkg/mcp/testing/server.go
对于mcpsource通过

mcp.RegisterResourceSourceServer(gs, srcServer)

进行注册

Server.EstablishResourceStream

获取客户端信息,并进行鉴权,交给ProcessStream进行处理

func (s *Server) EstablishResourceStream(stream mcp.ResourceSource_EstablishResourceStreamServer) error {if s.rateLimiter != nil {if err := s.rateLimiter.Wait(stream.Context()); err != nil {return err}}var authInfo credentials.AuthInfoif peerInfo, ok := peer.FromContext(stream.Context()); ok { //获取客户端信息authInfo = peerInfo.AuthInfo} else {scope.Warnf("No peer info found on the incoming stream.")}if err := s.authCheck.Check(authInfo); err != nil { // 认证return status.Errorf(codes.Unauthenticated, "Authentication failure: %v", err)}if err := stream.SendHeader(s.metadata); err != nil {return err}err := s.src.ProcessStream(stream) code := status.Code(err)if code == codes.OK || code == codes.Canceled || err == io.EOF {return nil}return err
}

Source.ProcessStream

  • s.newConnection(stream)

初始化connection,

con := &connection{stream:   stream,peerAddr: peerAddr,requestC: make(chan *mcp.RequestResources),watches:  make(map[string]*watch),watcher:  s.watcher,id:       atomic.AddInt64(&s.nextStreamID, 1),reporter: s.reporter,limiter:  s.requestLimiter.Create(),queue:    internal.NewUniqueScheduledQueue(len(s.collections)),
}

watcher即为Server.src的watcher

func NewServer(srcOptions *Options, serverOptions *ServerOptions) *Server {s := &Server{src:         New(srcOptions),authCheck:   serverOptions.AuthChecker,rateLimiter: serverOptions.RateLimiter,metadata:    serverOptions.Metadata,}return s
}
  • con.receive()

通过receive不断将数据写入requestC channel

  • 从requestC 读取请求
case req, more := <-con.requestC:if !more {return con.reqError}if con.limiter != nil {if err := con.limiter.Wait(stream.Context()); err != nil {return err}}if err := con.processClientRequest(req); err != nil {return err}
  • 响应入队
func (con *connection) queueResponse(resp *WatchResponse) {if resp == nil {con.queue.Close()} else {con.queue.Enqueue(resp.Collection, resp) //响应入队}
}
  • 响应出队

从queue中获取需要返回的response,返回给sink

collection, item, ok := con.queue.Dequeue() // 从queue读取处理后的返回结果
if !ok {break
}resp := item.(*WatchResponse)w, ok := con.watches[collection]
if !ok {scope.Errorf("unknown collection in dequeued watch response: %v", collection)break // bug?
}// the response may have been cleared before we got to it
if resp != nil {if err := con.pushServerResponse(w, resp); err != nil { //通过pushServerResponse 将响应发送给sinkreturn err}
}

connection.processClientRequest

通过调用Watch方法处理请求

sr := &Request{SinkNode:    req.SinkNode,Collection:  collection,VersionInfo: versionInfo,incremental: req.Incremental,}
w.cancel = con.watcher.Watch(sr, con.queueResponse, con.peerAddr)

由此我们可以看出我们实现一个mcp的核心在于实现一个watcher,传递给mcp source server

type Watcher interface {Watch(*Request, PushResponseFunc, string) CancelWatchFunc
}

官方mcp watcher 实现

pkg/mcp/testing/server.go

cache := snapshot.New(groups.DefaultIndexFn)

重点关注cache.Watch

func (c *Cache) Watch(request *source.Request,pushResponse source.PushResponseFunc,peerAddr string) source.CancelWatchFunc {group := c.groupIndex(request.Collection, request.SinkNode) // 获取sink要获取的groupc.mu.Lock()defer c.mu.Unlock()info := c.fillStatus(group, request, peerAddr) // 初始化对应group peer的同步状态信息collection := request.Collection// return an immediate response if a snapshot is available and the// requested version doesn't match.if snapshot, ok := c.snapshots[group]; ok {  // 获取对应组的snapshotversion := snapshot.Version(request.Collection) // 计算版本信息scope.Debugf("Found snapshot for group: %q for %v @ version: %q",group, request.Collection, version)if version != request.VersionInfo {        // 如果sink的当前版本和source的版本不一致则推送responsescope.Debugf("Responding to group %q snapshot:\n%v\n", group, snapshot)response := &source.WatchResponse{Collection: request.Collection,Version:    version,Resources:  snapshot.Resources(request.Collection),Request:    request,}pushResponse(response)return nil}info.synced[request.Collection][peerAddr] = true}// 如果版本一致则.返回一个cancel,同时记录对应的watchs,当SetSnapshot时,也就是有更新时会进行调用c.watchCount++watchID := c.watchCountscope.Infof("Watch(): created watch %d for %s from group %q, version %q",watchID, collection, group, request.VersionInfo)info.mu.Lock()info.watches[watchID] = &responseWatch{request: request, pushResponse: pushResponse}info.mu.Unlock()cancel := func() {c.mu.Lock()defer c.mu.Unlock()if s, ok := c.status[group]; ok {s.mu.Lock()delete(s.watches, watchID)s.mu.Unlock()}}return cancel
}

扫描关注我:

istio mcp探究相关推荐

  1. Nacos 1.1.4 发布,业界率先支持 Istio MCP 协议

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! Nacos 是阿里巴巴开源的服务发现与配置管理项目,本次 ...

  2. 在kubernetes+istio中通过FQDN请求Nacos服务

    公众号:<尹安灿> 欢迎交流 背景是我们希望能在k8s中通过DNS方式,访问服务的FQDN来调用虚拟机注册到nacos的服务. 我们vm和k8s的网段配置了相关路由能相互访问 之前naco ...

  3. 在KubeSphere中部署微服务(阡陌)+ DevOps

    一.微服务的一般部署顺序 第一步:部署中间件,如 mysql.redis.es.mq 第二步:部署注册中心,如 nacos 第三步:部署除了 getway 以外的后端服务 第四步:部署 getway ...

  4. docker-compose Seata+Nacos部署

    1.Seata+Nacos部署 脚本说明 client 地址: https://github.com/seata/seata/tree/develop/script/client 存放用于客户端的配置 ...

  5. Linux-Nacos-服务注册中心搭建

    Linux-Nacos-服务注册中心搭建 由于springcloud 注册中心 Eureka 早已停止维护所以我们使用Nacos 官方: https://nacos.io/zh-cn/docs/wha ...

  6. springboot+feign+nacos+seata+docker整合踏坑实录

    springboot+feign+nacos+seata+docker整合踏坑实录 一.版本 springboot:2.7.2 feign:3.1.5 jdk:19 seata:1.5.2 nacos ...

  7. RuoYi-Cloud 部署

    RuoYi-Cloud部署 1. 下载 点击右侧链接可以进入gitee的源码下载地址: 偌依微服务源码gitee下载地址 2. 数据库部署 依据如下步骤创建系统所需数据环境,脚本执行没有先后次序要求: ...

  8. docker开发环境搭建(windows)

    目录 Docker开发环境搭建 1.1mysql 1.1.1拉取镜像 1.1.2本地新建目录(windows为例) 1.1.3创建容器并添加本地映射 1.1.4连接容器数据库验证成功 2.1nacos ...

  9. nacos在windows系统下单机模式启动四部曲(2.1.2,重置密码)

    一.下载window版本并解压 nacos2.1.2压缩包nacos-server-Java文档类资源-CSDN下载 二.初始化数据并配置 1.将conf文件夹下mysql-schema.sql,表初 ...

最新文章

  1. 马斯克新视频:Boring公司将优先解决公交快速通勤
  2. Python使用matplotlib可视化气泡图、并使用encircle函数自定义多边形圈定可视化图像中的指定区域(Bubble plot with Encircling)
  3. 杭电oj1257最少拦截系统—贪心/dp最大递增子序列
  4. java 单引号的字符串类型_Java程序以字符串形式显示双引号和单引号
  5. 为什么睡觉时身体突然抖一下?答案吓到我了!
  6. Windows安装Python教程
  7. pytorchgpu测试_pytorch学习(十)—训练并测试CNN网络
  8. 【渝粤题库】陕西师范大学800002 地球概论
  9. Linux下C++可视化调试神器vgdb
  10. MySQL5.7 服务 crash 后无法启动
  11. iOS动画和第三方插件学习网址
  12. Paper reading:A simple baseline for one-shot multi-object tracking(二)
  13. 达梦数据库角色、用户管理
  14. 计算机开机键盘屏幕无反应,戴尔电脑开机键亮但为什么屏幕没有反应
  15. C++SLT入门简介
  16. Visual Studio Code中设置HTML/HTML5模板
  17. 如何在CSDN个人主页添加公众号或者个人微信二维码
  18. 【vue】avue-crud配置大全-持续更新
  19. STM32CubeMX(stm32F030C8T6) 之RTC闹钟唤醒停机模式-STM32开发实战 (2)
  20. 技术分享 | 学做测试平台开发-Vuetify 框架

热门文章

  1. 瑞萨电子Rcar-H3的qnx系统开发
  2. Tomcat 服务器的部署(优化)及配置虚拟主机
  3. java按钮倒计时_Hyena-倒计时按钮
  4. Okio源码学习分析
  5. NSAT-2000晶体管自动化测试系统
  6. html中如何插入草书,行草书转折十法举例
  7. 未来计算机相关资料,“诺奖级”材料!有望成为未来计算机和消费电子产品的“新基建”...
  8. Selenium 实战到吹牛系列:八
  9. 既然选择了远方,便只顾风雨兼程--myvue
  10. 素数计数公式全面拉丁化改写-小有改进-Meissel公式-梅塞尔-Lehmer公式-莱梅=勒梅尔-筛法三种形式-孟庆余公式(转载)...