Table of Contents

  • 1. 前言
  • 2. 源码目录浏览
  • 3. 客户端
  • 4. 相关链接

1 前言

grpc是一个通用的rpc框架,用google实现,当然也有go语言的版本。在工作中主要用到这个库,所以看看源码加强自己对框架的了解。目前来说主要分析的都以go版本为主(并没有看其他语言版本).由于个人水平有限,代码中的有些思想也是个人揣测,难免有些错误,如果发现错误,还望帮忙指出。

2 源码目录浏览

grpc使用protobuf(google的序列化框架)作为通信协议,底层上使用http2作为其传输协议,grpc源码中自己实现了http2的服务端跟客户端,而并没有用net/http包。http2有很多特性能够高效的传输数据,具体特点可以看相关链接详细了解。 grpc目录如下:看名字大概能看出这些目录中代码是哪些关系,documentation目录是存放一些文档,benchmark是压测,credentials是验证,examples是例子,grpclb是负载均衡,grpclog是日志,health是服务健康检查,metadata是元数据(用户客户端给服务端传送一些特殊数据,具体可以看相关链接),naming目录是提供名字服务需要实现的接口(相当于一个dns),stats是统计信息,transport 传输层实现(主要是http2的客户端与服务端时实现, 不会详细说这个目录),还有其他一些比较无关紧要的目录就不一一介绍了。

3 客户端

在example目录中有两个比较简单的例子,就先从这里入手吧,

func main() {// Set up a connection to the server.//建立一个链接conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)// Contact the server and print out its response.name := defaultNameif len(os.Args) > 1 {name = os.Args[1]}//调用函数r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.Message)
}

grcp.WithInsecure参数是在链接https服务端时不用检查服务端的证书(要是你相信服务端就不用检查).Dial函数对服务端建立一个连接, grpc.Dial函数:

func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target: target,conns:  make(map[Address]*addrConn),}cc.ctx, cc.cancel = context.WithCancel(context.Background())defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}if err != nil {cc.Close()}}()//设置grpc的各种选项for _, opt := range opts {opt(&cc.dopts)}// Set defaults.if cc.dopts.codec == nil {//默认用protobuf编解码cc.dopts.codec = protoCodec{}}if cc.dopts.bs == nil {cc.dopts.bs = DefaultBackoffConfig}creds := cc.dopts.copts.TransportCredentials//验证信息if creds != nil && creds.Info().ServerName != "" {cc.authority = creds.Info().ServerName} else {colonPos := strings.LastIndex(target, ":")if colonPos == -1 {colonPos = len(target)}cc.authority = target[:colonPos]}var ok boolwaitC := make(chan error, 1)//启动一个goroutine启动名字服务器(类似dns)go func() {var addrs []Addressif cc.dopts.balancer == nil {// Connect to target directly if balancer is nil.// 如果没设置负载均衡器,则直接连接addrs = append(addrs, Address{Addr: target})} else {var credsClone credentials.TransportCredentialsif creds != nil {credsClone = creds.Clone()}config := BalancerConfig{DialCreds: credsClone,}//启动负载均衡服务if err := cc.dopts.balancer.Start(target, config); err != nil {waitC <- errreturn}ch := cc.dopts.balancer.Notify()if ch == nil {// There is no name resolver installed.addrs = append(addrs, Address{Addr: target})} else {addrs, ok = <-chif !ok || len(addrs) == 0 {waitC <- errNoAddrreturn}}}for _, a := range addrs {//给每个地址一个conn,连接池if err := cc.resetAddrConn(a, false, nil); err != nil {waitC <- errreturn}}close(waitC)}()var timeoutCh <-chan time.Timeif cc.dopts.timeout > 0 {timeoutCh = time.After(cc.dopts.timeout)}select {case <-ctx.Done():return nil, ctx.Err()case err := <-waitC:if err != nil {return nil, err}case <-timeoutCh:return nil, ErrClientConnTimeout}// If balancer is nil or balancer.Notify() is nil, ok will be false here.// The lbWatcher goroutine will not be created.if ok {go cc.lbWatcher()}return cc, nil
}

通过dial这个函数,grpc已经建立了到服务端的连接,启动了自定义负载平衡(如果有的话). pb.NewGreeterClient这行代码是通过protoc工具自动生成的,它包一个grpc连接包裹在一个struct内方便调用生成的客户端grpc调用代码。接下来grpc客户端调用SayHello向服务器发送rpc请求。

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)//调用实际的发送请求函数err := grpc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, c.cc, opts...)if err != nil {return nil, err}return out, nil
}//最后主要是invoke函数
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {c := defaultCallInfofor _, o := range opts {//调用之前的hookif err := o.before(&c); err != nil {return toRPCErr(err)}}defer func() {for _, o := range opts {//执行完后的hooko.after(&c)}}()//trace相关代码if EnableTracing {c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)defer c.traceInfo.tr.Finish()c.traceInfo.firstLine.client = trueif deadline, ok := ctx.Deadline(); ok {c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())}c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.defer func() {if e != nil {c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)c.traceInfo.tr.SetError()}}()}//统计相关代码if stats.On() {ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})begin := &stats.Begin{Client:    true,BeginTime: time.Now(),FailFast:  c.failFast,}stats.HandleRPC(ctx, begin)}defer func() {//结束后的统计相关代码if stats.On() {end := &stats.End{Client:  true,EndTime: time.Now(),Error:   e,}stats.HandleRPC(ctx, end)}}()topts := &transport.Options{Last:  true,Delay: false,}for {var (err    errort      transport.ClientTransportstream *transport.Stream// Record the put handler from Balancer.Get(...). It is called once the// RPC has completed or failed.put func())// TODO(zhaoq): Need a formal spec of fail-fast.//传输层的配置callHdr := &transport.CallHdr{Host:   cc.authority,Method: method,}if cc.dopts.cp != nil {callHdr.SendCompress = cc.dopts.cp.Type()}gopts := BalancerGetOptions{BlockingWait: !c.failFast,}//得到传输成连接,在http2中一个传输单位是一个流。t, put, err = cc.getTransport(ctx, gopts)if err != nil {// TODO(zhaoq): Probably revisit the error handling.if _, ok := err.(*rpcError); ok {return err}if err == errConnClosing || err == errConnUnavailable {if c.failFast {return Errorf(codes.Unavailable, "%v", err)}continue}// All the other errors are treated as Internal errors.return Errorf(codes.Internal, "%v", err)}if c.traceInfo.tr != nil {c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)}// 发送请求stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)if err != nil {if put != nil {put()put = nil}// Retry a non-failfast RPC when// i) there is a connection error; or// ii) the server started to drain before this RPC was initiated.// 在这两种情况下重试,1 链接错误 2 在rpc初始化之前服务端已经开始服务if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {if c.failFast {return toRPCErr(err)}continue}return toRPCErr(err)}//收消息err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)if err != nil {if put != nil {put()put = nil}if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {if c.failFast {return toRPCErr(err)}continue}return toRPCErr(err)}if c.traceInfo.tr != nil {c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)}//关闭一个http2流t.CloseStream(stream, nil)if put != nil {put()put = nil}//Errorf会判断返回十分okreturn Errorf(stream.StatusCode(), "%s", stream.StatusDesc())}
}

在这个函数最主要是两个函数,一个是sendRequest,一个是recvResponse,首先看看sendRequest函数:

func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {// 创建一个http2流stream, err := t.NewStream(ctx, callHdr)if err != nil {return nil, err}defer func() {if err != nil {// If err is connection error, t will be closed, no need to close stream here.if _, ok := err.(transport.ConnectionError); !ok {t.CloseStream(stream, err)}}}()var (cbuf       *bytes.BufferoutPayload *stats.OutPayload)//压缩不为空if compressor != nil {cbuf = new(bytes.Buffer)}//统计if stats.On() {outPayload = &stats.OutPayload{Client: true,}}//编码并压缩数据outBuf, err := encode(codec, args, compressor, cbuf, outPayload)if err != nil {return nil, Errorf(codes.Internal, "grpc: %v", err)}//写入流err = t.Write(stream, outBuf, opts)if err == nil && outPayload != nil {outPayload.SentTime = time.Now()stats.HandleRPC(ctx, outPayload)}// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following// recvResponse to get the final status.if err != nil && err != io.EOF {return nil, err}// Sent successfully.return stream, nil
}

可以看到这个函数相当简单,做了两件事情,编码压缩数据并发送.再来看看recvResponse函数:

func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {// Try to acquire header metadata from the server if there is any.defer func() {if err != nil {if _, ok := err.(transport.ConnectionError); !ok {t.CloseStream(stream, err)}}}()c.headerMD, err = stream.Header()if err != nil {return}p := &parser{r: stream}var inPayload *stats.InPayloadif stats.On() {inPayload = &stats.InPayload{Client: true,}}for {//一直读到流关闭if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {if err == io.EOF {break}return}}if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.// Fix the order if necessary.stats.HandleRPC(ctx, inPayload)}c.trailerMD = stream.Trailer()return nil
}func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {//接受数据pf, d, err := p.recvMsg(maxMsgSize)if err != nil {return err}if inPayload != nil {inPayload.WireLength = len(d)}if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {return err}if pf == compressionMade {//解压d, err = dc.Do(bytes.NewReader(d))if err != nil {return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)}}if len(d) > maxMsgSize {// TODO: Revisit the error code. Currently keep it consistent with java// implementation.return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)}//数据解码if err := c.Unmarshal(d, m); err != nil {return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)}if inPayload != nil {inPayload.RecvTime = time.Now()inPayload.Payload = m// TODO truncate large payload.inPayload.Data = dinPayload.Length = len(d)}return nil
}

这里可以看到一个recvRespon可能会处理多个返回,但是确实在同一个for循环中处理的,有点奇怪。客户端代码大概就是这个流程。代码来说不算太复杂。(主要不钻进http2的实现,刚开始我就去看http2,一头雾水) 其中还有重要的地方就是负载均衡,通过它我们可以根据算法自动选择要连接的ip跟地址,还有验证的使用,放到下一篇吧

4 相关链接

  1. https://github.com/grpc/grpc/blob/master/doc/load-balancing.md 负载均衡
  2. https://www.gitbook.com/book/ye11ow/http2-explained/details 介绍http2的书籍,写的非常好
  3. http://www.grpc.io/docs/guides/concepts.html#metadata metadata介绍,在源码的Documentation目录有metadata的详细介绍
https://guidao.github.io/grpc.html

GRPC golang版源码分析之客户端(一)相关推荐

  1. GRPC golang版源码分析之客户端(二)

    Table of Contents 1. 前言 2. 负载均衡 3. 相关链接 1 前言 前面一篇文章分析了一个grpc call的大致调用流程,顺着源码走了一遍,但是grpc中有一些特性并没有进行分 ...

  2. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  3. golang map源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 1. map数据结构 Golang的map使用哈希表作为底层实现,一个哈希表里可以有多个哈希表节点,也即bucket,而每个b ...

  4. Spring Cloud源码分析——Ribbon客户端负载均衡

    年前聊了Eureka和Zookeeper的区别,然后微服务架构系列就鸽了三个多月,一直沉迷逛B站,无法自拔.最近公司复工,工作状态慢慢恢复(又是元气满满地划水).本文从以下3个方面进行分析(参考了翟永 ...

  5. Golang bytes源码分析

    bytes/bytes.go源码分析 Golang JDK 1.10.3 bytes包提供了操作[]byte的常用方法. 源码分析 func equalPortable(a, b []byte) bo ...

  6. android+小米文件管理器源码,[MediaStore]小米文件管理器android版源码分析——数据来源...

    打开小米的文件管理器,我们很快会看到如下图所示的界面: 其中,会把各种文件分类显示.并且显示出每种文件的个数. 这是怎么做到的呢?当然不是每次启动都查询sdcard和应用程序data目录文件啦,那样实 ...

  7. Golang channel 源码分析

    以下源码都摘自 golang 1.16.15 版本. 1. channel 底层结构 Golang 中的 channel 对应的底层结构为 hchan 结构体(channel的源码位置在Golang包 ...

  8. SpringBoot+El-upload实现上传文件到通用上传接口并返回文件全路径(若依前后端分离版源码分析)

    场景 SpringBoot+ElementUI实现通用文件下载请求(全流程图文详细教程): https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/deta ...

  9. redis源码分析 ppt_Redis源码分析之客户端+数据库

    客户端 数据结构 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 ...

最新文章

  1. 随手小记:快速适应未必是个好策略
  2. ITK:计算灰度图像的梯度幅度递推高斯
  3. R语言 深圳 面授_「深圳侦探电话」用R语言实现深度学习情感分析
  4. 衡量 mysql性能状态 参数 详解
  5. 【Java文件操作(七)】序列化:将自定义类写入文件
  6. SQL:find duplicate rows -- using group or having
  7. 中医科学院院士团队解析丹参纯合基因组和新基因簇在丹参酮合成中的作用
  8. Page.IsPostBack
  9. 关键词组合工具-自动关键词挖掘组成关键词软件免费
  10. bigemap地图下载器与91位图有何区别
  11. 四 H264解码输出yuv文件
  12. Mac系统制作win10启动U盘踩坑实操
  13. USPS国际快递查询单号
  14. 戴尔3080计算机重装系统步骤,终于发现戴尔笔记本重装系统的方法
  15. 在Apple Watch上了解时间旅行
  16. 元白:欲买桂花同载酒,终不似,少年游。
  17. 东方财富股票接口解析优缺点
  18. eclipse写程序从hdfs上下载文件到本地报错:at org.apache.hadoop.util.Shell.runCommand
  19. varlimo阿米洛机械键盘 win lock锁定
  20. 采用以太坊智能合约技术的报名系统源码

热门文章

  1. [深入浅出WP8.1(Runtime)]Socket编程之UDP协议
  2. 轻松搞定javascript变量(闭包,预解析机制,变量在内存的分配 )
  3. Webservice 或者HttpRequest请求的时候提示 “指定的注册表项不存在”错误 解决方案...
  4. C语言函数手册:c语言库函数大全|C语言标准函数库|c语言常用函数查询
  5. python如何调用c编译好可执行程序
  6. linux系统-软链接与硬链接区别
  7. 数组中两个数的最大异或值 两数异或值一定小于两数相加和
  8. PIC模拟从入门到熟练系列之组会PPT20210913《Note of PIC》
  9. Linux下修改命令提示符
  10. delphi项目开发经验2008年09月18日 星期四 10:07随着项目的失败,这些天一直在总结失败的原因,到底是为什么?