环境
环境的搭建此处省略一万句,百度上有无数资料
什么是proto文件?
proto文件来预先定义的消息格式。数据包是按照proto文件所定义的消息格式完成二进制码流的编码和解码,简单来说可以类比为API文档,请求与返回的结构,工作流模式
都是事先定义好的。实例如下,我们编写文件rpc.proto
:syntax = "proto3";
option go_package = "./;pb";
service Greeter {
// 定义方法,stream是常量,流模式
rpc ServerStream (StreamRequestData) returns (stream StreamResponseData); //服务端流模式,拉消息
rpc ClientStream (stream StreamRequestData) returns (StreamResponseData); //客户端流模式,推消息
rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData); //双向流模式,能推能拉
rpc OneStream (StreamRequestData) returns (StreamResponseData); //普通流模式
}
message StreamRequestData {
string data = 1; //编号
}
message StreamResponseData {
string data = 1; //编号
}
syntax
:指定使用的Protobuf协议版本,类似于在写python代码时开头加上python2/3
option go_package
:生成的代码放入指定的package中,这里使用的是Go,所以是go_package
service
:用于定义方法,结构内部为rpc + 函数名 (形参) + returns + (返回结构),在参数中有关键字 Stream,在参数前加入此关键字决定了该函数的工作流模式
message
:声明了消息体,内部结构可以是基础的类型,也可以再包含一个message作复合类型,每个参数后面的数字代表了序号,这决定了在Protobuf数据包的序列化、反序列化时,该字段的具体排序。
工作流模式
:普通流模式
,客户端流模式
,服务端流模式
,双向流模式
(以下用粗糙的场景进行类比,方便理解但不一定合适)
普通流模式
:类似用调用自己代码里的函数,亦或者一个简单个http请求,带着参数发送请求,得到结果
客户端流模式
:由客户端不断给服务端发送消息流,由服务端的逻辑来决定什么时候响应客户端
服务端流模式
:在客户端发起请求后,服务端不断的给客户端推消息流,类似于视频预览的请求,由客户端发起预览请求,服务端不断的推送视频数据(当然gRPC并不适用于视频的解析)
双向流模式
:类似聊天室,websocket,互相发送消息,一个全双工的通信
使用protoc命令生成gRPC代码,例如
protoc --go_out=. --go-grpc_out=. rpc.proto
xxx_out后指定的是路径,路径为当前终端打开位置的相对路径,这里有一个坑,指定路径时./
要与文件名之间有一个空格!!!
根据以上可以生成rpc.pb.go
与rpc_grpc.pb.go
两个文件,到此可以开始编写服务端与客户端的代码了
生成的代码IDE有可能会报错,升级一下IDE或者proto插件版本即可解决
//创建连接,使用忽略安全的模式,绑定端口8080
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err.Error())
}
//关闭连接
defer conn.Close()
//创建客户端
rpc := pb.NewGreeterClient(conn)
//到此可以使用创建的客户端对象rpc来调用方法
//rpc.ServerStream(xxxxxx)
各个方法的传参可以到生成的rpc_grpc.pb.go
中查看
type GreeterClient interface {
// 定义方法,stream是常量,流模式
ServerStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Greeter_ServerStreamClient, error)
ClientStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_ClientStreamClient, error)
AllStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_AllStreamClient, error)
OneStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (*StreamResponseData, error)
}
type GreeterServer struct {
}
//例子,在rpc_grpc.pb.go的interface中声明的接口此处要全部实现
func (this *GreeterServer) ClientStream(cli pb.Greeter_ClientStreamServer) error {
}
func main() {
g := grpc.NewServer()
pb.RegisterGreeterServer(g, new(GreeterServer))
list, _ := net.Listen("tcp", "127.0.0.1:8080")
g.Serve(list)
}
接口中函数的声明,其形参和返回参数可以到rpc_grpc.pb.go
文件中查看
type GreeterServer interface {
// 定义方法,stream是常量,流模式
ServerStream(*StreamRequestData, Greeter_ServerStreamServer) error
ClientStream(Greeter_ClientStreamServer) error
AllStream(Greeter_AllStreamServer) error
OneStream(context.Context, *StreamRequestData) (*StreamResponseData, error)
MustEmbedUnimplementedGreeterServer() //这里不知道为什么生成的时候是小写的,又很蛋疼的必须实现,手动改为大写
}
params := new(pb.StreamRequestData)
params.Data = "hello"
ctr, _ := rpc.OneStream(context.Background(), params)
服务端
func (this *GreeterServer) OneStream(ctx context.Context, req *pb.StreamRequestData) (*pb.StreamResponseData, error) {
fmt.Println(req.GetData())
return &pb.StreamResponseData{
Data: "heiheihei",
}, nil
}
客户端流模式
客户端
puts, _ := rpc.ClientStream(context.Background())
for i := 0; i < 10; i++ {
//循环向服务端发送消息
puts.Send(&pb.StreamRequestData{
Data: "xxx",
})
}
服务端
func (this *GreeterServer) ClientStream(cli pb.Greeter_ClientStreamServer) error {
for {
a, err := cli.Recv()
if err != nil {
fmt.Println(err)
return nil
}
fmt.Println(a)
}
return nil
}
服务端端流模式
客户端
params := new(pb.StreamRequestData)
params.Data = "xxx"
rec, _ := rpc.ServerStream(context.Background(), params)
for {
//循环接收服务端的消息
data, _ := rec.Recv()
fmt.Println(data.GetData())
}
服务端
func (this *GreeterServer) ServerStream(req *pb.StreamRequestData, rep pb.Greeter_ServerStreamServer) error {
if req.GetData() == "xxx" {
for {
params := new(pb.StreamResponseData)
params.Data = "heiheihei"
//循环向客户端发送消息
rep.Send(params)
}
}
return nil
}
双向流模式
客户端
allStr, _ := rpc.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
//接受服务端消息的协程
go func() {
defer wg.Done()
for {
//业务代码
res, err := allStr.Recv()
if err != nil {
fmt.Println("服务端流数据发送over:", err)
break
}
fmt.Println("服务端消息:", res.Data)
}
}()
//发送消息给服务端的协程
go func() {
defer wg.Done()
i := 0
for {
i++
//业务代码
_ = allStr.Send(&pb.StreamRequestData{
Data: fmt.Sprintf("hello"),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}()
wg.Wait()
服务端
func (this *GreeterServer) AllStream(allStr pb.Greeter_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
//接受客户端消息的协程
go func() {
defer wg.Done()
for {
//业务代码
res, err := allStr.Recv()
if err != nil {
fmt.Println("客户端流数据over:", err)
break
}
fmt.Println("收到客户端消息:", res.Data)
}
}()
//发送消息给客户端的协程
go func() {
defer wg.Done()
i := 0
for {
i++
//业务代码
_ = allStr.Send(&pb.StreamResponseData{
Data: fmt.Sprintf("world"),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}()
wg.Wait()
return nil
}
}