gRPC的stream使用
gRPC的stream介绍
- gRPC的stream介绍
- gRPC为什么提供 steam功能?
- gRPC的stream的分类
- 一元RPC
- 客户端流RPC
- 服务端流RPC
- 双向流RPC
gRPC的stream介绍
gRPC为什么提供 steam功能?
在以下场景使用unary rpc 可能有如下问题
- 数据表过大可能造成熟瞬时的压力
- 服务需要全部数据接收完毕,才能正确回调响应,进行业务处理,不能客户端边发送,服务端边接受。
stream rpc 适用于,大规模数据传递,和实时场景。
gRPC的stream的分类
- stream 分类
一元RPC
- 定义形式
rpc SayHello(HelloRequest) returns (HelloResponse);
- 描述
1、客户端发送一个请求给服务端,
2、得到服务端的一个响应,
3、就像正常的函数调用
- client
package mainimport ("context"pb "d-grpc/lib/proto/hellowrold""google.golang.org/grpc""log"
)const (address = "localhost:50051"
)func main() {// 链接服务conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatalf("dit not connetc:%v", err)return}defer conn.Close()// 创建一个客户端c := pb.NewGreeterClient(conn)r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "world"})if err != nil {log.Fatalf("could not greet: %v", err)}// 打印响应信息log.Printf("Greeting: %s", r.GetMessage())
}
- server
package mainimport ("context"pb "d-grpc/lib/proto/hellowrold""google.golang.org/grpc""log""net"
)const (port = ":50051"
)//
func main() {lis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen %v", err)}s := grpc.NewServer()pb.RegisterGreeterServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve:%v", err)}
}type server struct {// 这个结构体实现了GreeterServer接口,避免service未实现pb.UnimplementedGreeterServer
}func (*server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetName())return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
客户端流RPC
- 定义形式
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
- 描述
1、客户端会以流式的方式将数据写到到服务端,(客户端的写和服务端的读都是同时进行的)
2、客户端完成写的消息后,关闭发送,并等待接收消息
3、服务端循环读取消息,直到遇到一个io.EOF,并返回响应消息
4、始终使用一个rpc请求来操作
- client
package mainimport ("context"pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""log"
)/*
1、客户端会写一个有效的数据到服务端,
2、客户端完成写的消息后,关闭发送,并等待接收消息
3、服务端读取消息,并返回响应消息
4、始终使用一个rpc请求来操作
*/// 客户端流RPC
func main() {clientCon, err := grpc.Dial("127.0.0.1:5001", grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatal(err)}clientStreamingEcho(pb.NewEchoClient(clientCon))
}func clientStreamingEcho(client pb.EchoClient) {stream, err := client.ClientStreamingEcho(context.Background())if err != nil {log.Fatal(err)}for i := 1; i < 10; i++ {req := &pb.EchoRequest{Message: fmt.Sprintf("%d", i),}// 会写成 有序的消息发送给给客户端if err := stream.Send(req); err != nil {log.Fatal(err)}}// 关闭发送 并 接收数据, 让rpc知道我们客户端已经完成写的操作resp, err := stream.CloseAndRecv()if err != nil {log.Fatal(err)}fmt.Println(resp.GetMessage())
}
- server
package mainimport (pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log""net""strings"
)func main() {lis, err := net.Listen("tcp", "127.0.0.1:5001")if err != nil {log.Fatal(err)}s := grpc.NewServer()pb.RegisterEchoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatal(err)}
}type server struct {pb.UnimplementedEchoServer
}// https://grpc.io/docs/languages/go/basics/
func (server) ClientStreamingEcho(stream pb.Echo_ClientStreamingEchoServer) error {// 循环读取客户端链接信息var strList []stringfor {// 不断取得客户端请求数据req, err := stream.Recv()strList = append(strList, req.GetMessage())// 代表客户端已经结束发送数据了if err == io.EOF {fmt.Println(strings.Join(strList, ","))return stream.SendAndClose(&pb.EchoResponse{Message: "已经全部接受完毕",})}if err != nil {return err}}
}
服务端流RPC
- 定义形式
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
描述
1、客户端发送一个请求给服务端,得到一个stream
2、循环从流中读取服务端返回的有序数据
3、读取消息直到遇到一个io.EOF则表示服务端没后更多的消息
client
package mainimport ("context"pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log""strings"
)/**/
func main() {clientCon, err := grpc.Dial("127.0.0.1:5001", grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatal(err)}echoClient := pb.NewEchoClient(clientCon)req := &pb.EchoRequest{}stream, err := echoClient.ServerStreamingEcho(context.Background(), req)if err != nil {log.Fatal(err)}var strList []stringfor {resp, err := stream.Recv()if err == nil {strList = append(strList, resp.GetMessage())}if err == io.EOF {fmt.Println(err)break}if err != nil {log.Println(err)}}fmt.Println(strings.Join(strList, ","))}
- server
package mainimport (pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""log""net"
)func main() {lis, err := net.Listen("tcp", "127.0.0.1:5001")if err != nil {log.Fatal(err)}s := grpc.NewServer()pb.RegisterEchoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatal(err)}
}type server struct {pb.UnimplementedEchoServer
}func (server) ServerStreamingEcho(req *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {// 向客户端发送数据for i := 1; i < 10; i++ {err := stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("%d", i),})if err != nil {return err}}// 返回 nil 或者 err就代表服务端响应数据完毕return nil
}
双向流RPC
- 定义形式
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
- 描述
双向流RPC,客户端和服务端异步的读取和接收数据
- client
package mainimport ("context"pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log"
)// 客户端流RPC
func main() {clientCon, err := grpc.Dial("127.0.0.1:5001", grpc.WithInsecure(), grpc.WithBlock())if err != nil {log.Fatal(err)}bidirectionalStreamingEcho(pb.NewEchoClient(clientCon))
}func bidirectionalStreamingEcho(client pb.EchoClient) {stream, err := client.BidirectionalStreamingEcho(context.Background())if err != nil {log.Fatal(err)}done := make(chan struct{})// 异步读取服务端消息go func() {for {resp, err := stream.Recv()if err == io.EOF {close(done)return}if err != nil {log.Fatal(err)}log.Printf("recv-server-%s", resp.GetMessage())}}()// 想服务端发送消息for i := 1; i < 10; i++ {err := stream.Send(&pb.EchoRequest{Message: fmt.Sprintf("%d", i),})if err != nil {log.Fatal(err)}}// 部分发送了-代表客户端的数据已经放完毕了_ = stream.CloseSend()// 服务端消息接受完毕,退出程序<-done
}
- server
package mainimport (pb "d-grpc/lib/proto/echo""fmt""google.golang.org/grpc""io""log""net"
)func main() {lis, err := net.Listen("tcp", "127.0.0.1:5001")if err != nil {log.Fatal(err)}s := grpc.NewServer()pb.RegisterEchoServer(s, &server{})if err := s.Serve(lis); err != nil {log.Fatal(err)}
}type server struct {pb.UnimplementedEchoServer
}func (server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {for {req, err := stream.Recv()if err == io.EOF {return nil}if err != nil {return err}log.Printf("recv-client-%s\n", req.GetMessage())// 发送响应的数据err = stream.Send(&pb.EchoResponse{Message: fmt.Sprintf("server-%s", req.GetMessage()),})if err != nil {return err}}
}
gRPC的stream使用相关推荐
- dubbogo 3.0:牵手 gRPC 走向云原生时代
作者 | 李志信 于雨 来源|阿里巴巴云原生公众号 自从 2011 年 Dubbo 开源之后,被大量中小公司采用,一直是国内最受欢迎的 RPC 框架.2014 年,由于阿里内部组织架构调整,Dubb ...
- GRPC的四种数据流以及案例
GRPC的四种数据流: 1.简单模式(Simple RPC) 2.服务端数据流模式(Server-side Streaming RPC) 3.客户端数据流模式(Client-side Streamin ...
- gRPC系列(三) 如何借助HTTP2实现传输
本系列分为四大部分: gRPC系列(一) 什么是RPC? gRPC系列(二) 如何用Protobuf组织内容 gRPC系列(三)如何借助HTTP2实现传输 gRPC系列(四) 框架如何赋能分布式系统 ...
- 后端如何发出请求_gRPC系列(三) 如何借助HTTP2实现传输
本系列分为四大部分: gRPC系列(一) 什么是RPC? gRPC系列(二) 如何用Protobuf组织内容 gRPC系列(三) 如何借助HTTP2实现传输 gRPC系列(四) 框架如何赋能分布式系统 ...
- 电子书下载 | 超实用!阿里售后专家的 K8s 问题排查案例合集
<关注公众号,回复"排查"获取下载链接> <深入浅出 Kubernetes>开放下载 本书作者罗建龙(花名声东),阿里云技术专家,有着多年操作系统和图形显卡 ...
- 区块链教程Fabric1.0源代码分析流言算法Gossip服务端二
区块链教程Fabric1.0源代码分析流言算法Gossip服务端二 Fabric 1.0源代码笔记 之 gossip(流言算法) #GossipServer(Gossip服务端) 5.2.commIm ...
- 深度解读服务治理 ServiceMesh、xDS
最近在同程艺龙蹲坑,聊一聊微服务治理的核心难点.历史演进.最新实现. ☺️以上内容属自我思考,如理解有偏差.理解不透彻.现状梳理不清楚的请大家多指教. 大纲 微服务治理的核心难点 方案演进的法宝:代理 ...
- 牌类游戏使用微服务重构笔记(八): 游戏网关服务器
网关服务器 所谓网关,其实就是维持玩家客户端的连接,将玩家发的游戏请求转发到具体后端服务的服务器,具有以下几个功能点: 长期运行,必须具有较高的稳定性和性能 对外开放,即客户端需要知道网关的IP和端口 ...
- K8S从懵圈到熟练 - 节点下线姊妹篇
之前分享过一例集群节点NotReady的问题.在那个问题中,我们的排查路劲,从K8S集群到容器运行时,再到sdbus和systemd,不可谓不复杂.那个问题目前已经在systemd中做了修复,所以基本 ...
最新文章
- Styling with the DataGridColumnStyle
- Projects(子查询、连接,分组,聚合)
- Netbeans and Remote Host for C/C++ Developing
- 现代优化计算方法_【公开课】供应链库存优化与需求预测管理
- Protocol Buffers C++ 入门教程
- LSOF 安装与使用
- 国际标准UTC时间转化北京时间
- 自动驾驶中ROS操作系统的重要性
- 【转载】MongoDB 极简实践入门
- DNS测试 nslookup
- 超星问卷与麦客问卷自动填写(selenium+Chrome)
- Dynamics 365Online 使用Xrm.WebApi.online.execute执行自定义Action
- 分析蓝牙协议栈源码bstack
- docx行间距怎么设置_Word如何调整字间距,行间距.docx
- 山东大学软件学院项目实训第一周
- ShockwaveFlash 详解
- 如何验证下载的Microsoft软件是否为正版??
- 2023年QS世界大学学科排名,计算机学科表现如何?
- 人工智能复习(期末考试)
- 2022视力矫正训练设备展,验光设备/近视矫正/近视眼镜展
热门文章
- 数据结构实训项目---比较一些排序
- ASEMI电磁炉整流桥KBPC2510怎么测量好坏
- docker-compose up之Windows named pipe error:(code: 2)
- 什么是数字化营销?与ChatGPT结合能产生的化学反应?
- C++/C++11中std::transform的使用
- Delphi中Move、CopyMemory操作
- [c++整人代码]耗尽电脑内存 「v1.0」
- php获取qq号码,php获取qq用户昵称和在线状态(实例分析)
- 雨林木风 Ghost Win7 SP1 装机版 2013.04
- Bash Export命令