gRPC的stream介绍

  • gRPC的stream介绍
    • gRPC为什么提供 steam功能?
    • gRPC的stream的分类
      • 一元RPC
      • 客户端流RPC
      • 服务端流RPC
      • 双向流RPC

gRPC的stream介绍

gRPC为什么提供 steam功能?

  • 在以下场景使用unary rpc 可能有如下问题

    • 数据表过大可能造成熟瞬时的压力
    • 服务需要全部数据接收完毕,才能正确回调响应,进行业务处理,不能客户端边发送,服务端边接受。
  • stream rpc 适用于,大规模数据传递,和实时场景。

gRPC的stream的分类

  • stream 分类

一元RPC

  • 定义形式
rpc SayHello(HelloRequest) returns (HelloResponse);
  • 描述
    1、客户端发送一个请求给服务端,
    2、得到服务端的一个响应,
    3、就像正常的函数调用
  • client
package mainimport ("context"pb "d-grpc/lib/proto/hellowrold""google.golang.org/grpc""log"
)const (address = "localhost:50051"
)func main() {// 链接服务conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatalf("dit not connetc:%v", err)return}defer conn.Close()// 创建一个客户端c := pb.NewGreeterClient(conn)r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "world"})if err != nil {log.Fatalf("could not greet: %v", err)}// 打印响应信息log.Printf("Greeting: %s", r.GetMessage())
}
  • server
package mainimport ("context"pb "d-grpc/lib/proto/hellowrold""google.golang.org/grpc""log""net"
)const (port = ":50051"
)//
func main() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen %v", err)}s := grpc.NewServer()pb.RegisterGreeterServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve:%v", err)}
}type server struct {// 这个结构体实现了GreeterServer接口,避免service未实现pb.UnimplementedGreeterServer
}func (*server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetName())return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

客户端流RPC

  • 定义形式
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
  • 描述
    1、客户端会以流式的方式将数据写到到服务端,(客户端的写和服务端的读都是同时进行的)
    2、客户端完成写的消息后,关闭发送,并等待接收消息
    3、服务端循环读取消息,直到遇到一个io.EOF,并返回响应消息
    4、始终使用一个rpc请求来操作
  • client
package mainimport ("context"pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""log"
)/*
1、客户端会写一个有效的数据到服务端,
2、客户端完成写的消息后,关闭发送,并等待接收消息
3、服务端读取消息,并返回响应消息
4、始终使用一个rpc请求来操作
*/// 客户端流RPC
func main() {clientCon, err := grpc.Dial("127.0.0.1:5001", grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatal(err)}clientStreamingEcho(pb.NewEchoClient(clientCon))
}func clientStreamingEcho(client pb.EchoClient) {stream, err := client.ClientStreamingEcho(context.Background())if err != nil {log.Fatal(err)}for i := 1; i < 10; i++ {req := &pb.EchoRequest{Message: fmt.Sprintf("%d", i),}// 会写成 有序的消息发送给给客户端if err := stream.Send(req); err != nil {log.Fatal(err)}}// 关闭发送 并 接收数据, 让rpc知道我们客户端已经完成写的操作resp, err := stream.CloseAndRecv()if err != nil {log.Fatal(err)}fmt.Println(resp.GetMessage())
}
  • server
package mainimport (pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log""net""strings"
)func main() {lis, err := net.Listen("tcp", "127.0.0.1:5001")if err != nil {log.Fatal(err)}s := grpc.NewServer()pb.RegisterEchoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatal(err)}
}type server struct {pb.UnimplementedEchoServer
}// https://grpc.io/docs/languages/go/basics/
func (server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {// 循环读取客户端链接信息var strList []stringfor {// 不断取得客户端请求数据req, err := stream.Recv()strList = append(strList, req.GetMessage())// 代表客户端已经结束发送数据了if err == io.EOF {fmt.Println(strings.Join(strList, ","))return stream.SendAndClose(&pb.EchoResponse{Message: "已经全部接受完毕",})}if err != nil {return err}}
}

服务端流RPC

  • 定义形式
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
  • 描述
    1、客户端发送一个请求给服务端,得到一个stream
    2、循环从流中读取服务端返回的有序数据
    3、读取消息直到遇到一个io.EOF则表示服务端没后更多的消息

  • client

package mainimport ("context"pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log""strings"
)/**/
func main() {clientCon, err := grpc.Dial("127.0.0.1:5001", grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatal(err)}echoClient := pb.NewEchoClient(clientCon)req := &pb.EchoRequest{}stream, err := echoClient.ServerStreamingEcho(context.Background(), req)if err != nil {log.Fatal(err)}var strList []stringfor {resp, err := stream.Recv()if err == nil {strList = append(strList, resp.GetMessage())}if err == io.EOF {fmt.Println(err)break}if err != nil {log.Println(err)}}fmt.Println(strings.Join(strList, ","))}
  • server
package mainimport (pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""log""net"
)func main() {lis, err := net.Listen("tcp", "127.0.0.1:5001")if err != nil {log.Fatal(err)}s := grpc.NewServer()pb.RegisterEchoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatal(err)}
}type server struct {pb.UnimplementedEchoServer
}func (server) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {// 向客户端发送数据for i := 1; i < 10; i++ {err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("%d", i),})if err != nil {return err}}// 返回 nil 或者 err就代表服务端响应数据完毕return nil
}

双向流RPC

  • 定义形式
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
  • 描述
    双向流RPC,客户端和服务端异步的读取和接收数据
  • client
package mainimport ("context"pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log"
)// 客户端流RPC
func main() {clientCon, err := grpc.Dial("127.0.0.1:5001", grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatal(err)}bidirectionalStreamingEcho(pb.NewEchoClient(clientCon))
}func bidirectionalStreamingEcho(client pb.EchoClient) {stream, err := client.BidirectionalStreamingEcho(context.Background())if err != nil {log.Fatal(err)}done := make(chan struct{})// 异步读取服务端消息go func() {for {resp, err := stream.Recv()if err == io.EOF {close(done)return}if err != nil {log.Fatal(err)}log.Printf("recv-server-%s", resp.GetMessage())}}()// 想服务端发送消息for i := 1; i < 10; i++ {err := stream.Send(&pb.EchoRequest{Message: fmt.Sprintf("%d", i),})if err != nil {log.Fatal(err)}}// 部分发送了-代表客户端的数据已经放完毕了_ = stream.CloseSend()// 服务端消息接受完毕,退出程序<-done
}
  • server
package mainimport (pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log""net"
)func main() {lis, err := net.Listen("tcp", "127.0.0.1:5001")if err != nil {log.Fatal(err)}s := grpc.NewServer()pb.RegisterEchoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatal(err)}
}type server struct {pb.UnimplementedEchoServer
}func (server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {for {req, err := stream.Recv()if err == io.EOF {return nil}if err != nil {return err}log.Printf("recv-client-%s\n", req.GetMessage())// 发送响应的数据err = stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("server-%s", req.GetMessage()),})if err != nil {return err}}
}

gRPC的stream使用相关推荐

  1. dubbogo 3.0:牵手 gRPC 走向云原生时代

    作者 | 李志信  于雨 来源|阿里巴巴云原生公众号 自从 2011 年 Dubbo 开源之后,被大量中小公司采用,一直是国内最受欢迎的 RPC 框架.2014 年,由于阿里内部组织架构调整,Dubb ...

  2. GRPC的四种数据流以及案例

    GRPC的四种数据流: 1.简单模式(Simple RPC) 2.服务端数据流模式(Server-side Streaming RPC) 3.客户端数据流模式(Client-side Streamin ...

  3. gRPC系列(三) 如何借助HTTP2实现传输

    本系列分为四大部分: gRPC系列(一) 什么是RPC? gRPC系列(二) 如何用Protobuf组织内容 gRPC系列(三)如何借助HTTP2实现传输 gRPC系列(四) 框架如何赋能分布式系统 ...

  4. 后端如何发出请求_gRPC系列(三) 如何借助HTTP2实现传输

    本系列分为四大部分: gRPC系列(一) 什么是RPC? gRPC系列(二) 如何用Protobuf组织内容 gRPC系列(三) 如何借助HTTP2实现传输 gRPC系列(四) 框架如何赋能分布式系统 ...

  5. 电子书下载 | 超实用!阿里售后专家的 K8s 问题排查案例合集

    <关注公众号,回复"排查"获取下载链接> <深入浅出 Kubernetes>开放下载 本书作者罗建龙(花名声东),阿里云技术专家,有着多年操作系统和图形显卡 ...

  6. 区块链教程Fabric1.0源代码分析流言算法Gossip服务端二

    区块链教程Fabric1.0源代码分析流言算法Gossip服务端二 Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端) 5.2.commIm ...

  7. 深度解读服务治理 ServiceMesh、xDS

    最近在同程艺龙蹲坑,聊一聊微服务治理的核心难点.历史演进.最新实现. ☺️以上内容属自我思考,如理解有偏差.理解不透彻.现状梳理不清楚的请大家多指教. 大纲 微服务治理的核心难点 方案演进的法宝:代理 ...

  8. 牌类游戏使用微服务重构笔记(八): 游戏网关服务器

    网关服务器 所谓网关,其实就是维持玩家客户端的连接,将玩家发的游戏请求转发到具体后端服务的服务器,具有以下几个功能点: 长期运行,必须具有较高的稳定性和性能 对外开放,即客户端需要知道网关的IP和端口 ...

  9. K8S从懵圈到熟练 - 节点下线姊妹篇

    之前分享过一例集群节点NotReady的问题.在那个问题中,我们的排查路劲,从K8S集群到容器运行时,再到sdbus和systemd,不可谓不复杂.那个问题目前已经在systemd中做了修复,所以基本 ...

最新文章

  1. Styling with the DataGridColumnStyle
  2. Projects(子查询、连接,分组,聚合)
  3. Netbeans and Remote Host for C/C++ Developing
  4. 现代优化计算方法_【公开课】供应链库存优化与需求预测管理
  5. Protocol Buffers C++ 入门教程
  6. LSOF 安装与使用
  7. 国际标准UTC时间转化北京时间
  8. 自动驾驶中ROS操作系统的重要性
  9. 【转载】MongoDB 极简实践入门
  10. DNS测试 nslookup
  11. 超星问卷与麦客问卷自动填写(selenium+Chrome)
  12. Dynamics 365Online 使用Xrm.WebApi.online.execute执行自定义Action
  13. 分析蓝牙协议栈源码bstack
  14. docx行间距怎么设置_Word如何调整字间距,行间距.docx
  15. 山东大学软件学院项目实训第一周
  16. ShockwaveFlash 详解
  17. 如何验证下载的Microsoft软件是否为正版??
  18. 2023年QS世界大学学科排名,计算机学科表现如何?
  19. 人工智能复习(期末考试)
  20. 2022视力矫正训练设备展,验光设备/近视矫正/近视眼镜展

热门文章

  1. 数据结构实训项目---比较一些排序
  2. ASEMI电磁炉整流桥KBPC2510怎么测量好坏
  3. docker-compose up之Windows named pipe error:(code: 2)
  4. 什么是数字化营销?与ChatGPT结合能产生的化学反应?
  5. C++/C++11中std::transform的使用
  6. Delphi中Move、CopyMemory操作
  7. [c++整人代码]耗尽电脑内存 「v1.0」
  8. php获取qq号码,php获取qq用户昵称和在线状态(实例分析)
  9. 雨林木风 Ghost Win7 SP1 装机版 2013.04
  10. Bash Export命令