当前位置: 首页 > 工具软件 > 流式RPC > 使用案例 >

[Go]grpc与流式rpc简介

羊浩广
2023-12-01


gRPC是一个语言中立、平台中立、高性能、通用的开源RPC框架;基于ProtoBuf(Protocol Buffers)序列化协议开发,且支持众多开发语言。

gRPC安装

安装protoc

从 https://github.com/google/protobuf/releases下载预编译的“protoc编译器”,用于生成gRPC服务代码。

解压zip文件,并将protoc二进制文件所在bin目录添加到PATH环境变量中。

安装golang插件

安装好golang后,设定环境:

go env -w GO111MODULE=auto
go env -w GOPROXY=https://goproxy.cn

安装proto-go,命令如下:

go get -u github.com/golang/protobuf/proto
go get -u github.com/golang/protobuf/protoc-gen-go

因被墙,可无法直接安装:

git clone https://github.com/golang/protobuf.git
# 进入protobuf/protoc-gen-go目录
go build
go install
# 此时,在$GOPATH下bin目录下会有protoc-gen-go  (把此目录添加到PATH环境变量)

生成

要生成grpc,有三步:

  • 编写.proto文件
  • 利用工具将.proto文件生成对应语言的代码
  • 根据生成的代码编写服务端和客户端的代码
// 在proto文件中,需要设定包名
option go_package = "./;mytest";  //路径和包名

# 编写好后,通过protoc生成对应代码():
./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto

命令参数说明:

  • -I:指定proto文件存放目录;若文件不在当前目录下,则是必须的;
  • --go_out=plugins=grpc::指定生成go的代码,并且冒号后指定go代码的存放目录;

简单gRPC

gRPC 里客户端应用可以像调用本地对象一样直接调用远端服务上的应用方法,使得创建分布式应用和服务更简单。其基于以下理念:

  • 定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型);
  • 在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用;
  • 在客户端拥有一个存根(像服务端一样的方法),代理服务端对应的接口。

定义proto

先定义个简单的请求与相应示例

syntax = "proto3";

package service;

option go_package = "./;product";

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

把示例文件放在proto目录下,执行protoc命令:

./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto

会生成product.pb.go文件,里面包含服务端与客户端的代码。

服务端

要实现proto中的服务,需要实现GreeterServer接口:

type GreeterServer interface {
	SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}

先实现一个对应服务:

type server struct {
}

func (s *server) SayHello(ctx context.Context, in *product.HelloRequest) (*product.HelloReply, error) {
	log.Println("Input:", in)
	return &product.HelloReply{Message: "Hello " + in.Name}, nil
}

然后创建gRPC服务:

func main() {
	// 创建 Tcp 连接
	listen, err := net.Listen("tcp", ":5901")
	if err != nil {
		log.Println("Listen failed:", err)
		return
	}
	
	rpcServer := grpc.NewServer()
	
	// 注册服务实现者
	// 此函数在.pb.go中,自动生成
	product.RegisterGreeterServer(rpcServer, &server{})

	// 在 gRPC 服务上注册反射服务
	// 以便可通过grpcurl工具来查询服务列表或调用grpc方法
	// reflection.Register(grpcServer)

	err = grpcServer.Serve(listener)
	if err != nil {
		log.Fatalf("Start RPC Server fail: %v", err)
	}
}

客户端

客户端可通过Dial方便地连接,若要做一些参数配置,则需要通过DialContext:

func main() {
	//conn, err := grpc.Dial(":5901", grpc.WithInsecure())
	conn, err := grpc.DialContext(context.Background(), ":5901",
		grpc.WithInsecure(),
		grpc.WithKeepaliveParams(keepalive.ClientParameters{
			Time:                10 * time.Second, // send pings every 10 seconds if there is no activity
			Timeout:             2 * time.Second,  // wait 2 second for ping ack before considering the connection dead
			PermitWithoutStream: true,  // send pings even without active streams
		}))
	if err != nil {
		log.Println("Connect fail:", err)
		//return
	}
	defer conn.Close()

	count := 0
	client := product.NewGreeterClient(conn)
	for {
		time.Sleep(5 * time.Second)
		count++
		log.Println("Try", count)

		reply, err := client.SayHello(context.Background(), &product.HelloRequest{Name: "Mike" + strconv.Itoa(count)})

		if err != nil {
			log.Println("Call fail:", err)
			continue
		}

		log.Println("Reply:", reply)
	}
}

流式gRPC

相比于简单gRPC,流式(stream)gRPC实现发送/接收大量数据,或不断传输数据的场景。可分为:

  • ServerStream,服务端推送流:客户端发送一个单独的请求,服务端返回流式数据,客户端读取流式数据直到EOF
    • 服务端处理完成后(推流完所有数据),return nil代表响应完成;
    • 客户端通过err == io.EOF判断服务端是否响应完成;
  • ClientStream,客户端推送流:客户端写入流式数据,写入完成后等待服务端返回结果
    • 客户端发送完毕通过**CloseAndRecv**关闭stream 并接收服务端响应;
    • 服务端通过err == io.EOF判断客户端是否发送完毕,完毕后使用**SendAndClose**关闭 stream并返回响应;
  • BidirectionalStream,双向推送流:这两个流是独立运行的,因此客户端和服务器可以按照自己需要的顺序进行读写
    • 客户端服务端都通过stream向对方推送数据,推送完成后通过**CloseSend**关闭流;通过err == io.EOF判断服务端是否响应完成(接收完数据);
    • 服务端通过err == io.EOF判断客户端是否响应完成,通过return nil表示已经完成响应;

定义proto

定义数据结构:

syntax = "proto3";
option go_package = ".;proto";

//三个流式rpc
//GetStream服务器返回流
//PutStream客户端上传流
//DiStream双向流
service Stream{
  rpc GetStream(MsgRequest)returns(stream MsgReply){}
  rpc PutStream(stream MsgRequest)returns(MsgReply){}
  rpc BiStream(stream MsgRequest)returns(stream MsgReply){}
}

message MsgRequest {
  string data = 1;
}

message MsgReply {
  string data = 1;
}

服务端

定义服务,实现对应流接口:

type server struct {
}

//服务端->客户端 单向流
func (*server) GetStream(req *proto.MsgRequest, getServer proto.Stream_GetStreamServer) error {
	log.Println("GetServer Start.")
	i := 0
	for i < 10 {
		i++
		getServer.Send(&proto.MsgReply{Data: req.Data + ":" + fmt.Sprintf("%v", time.Now().Unix())})
		log.Println("Get Res Send.")
		time.Sleep(1 * time.Second)
	}
	log.Println("GetServer Start.")
	return nil
}

//客户端->服务端 单向流
func (*server) PutStream(putServer proto.Stream_PutStreamServer) error {
	log.Println("PutServer Start.")
	var cliStr strings.Builder
	for {
		if putReq, err := putServer.Recv(); err == nil {
			log.Println("Put Req: " + putReq.Data)
			cliStr.WriteString(putReq.Data)
		} else {
			putServer.SendAndClose(&proto.MsgReply{Data: "Finish. Your Data is: " + cliStr.String()})
			break
		}
	}
	log.Println("PutServer Done.")
	return nil
}

//双向流
func (*server) BiStream(biServer proto.Stream_BiStreamServer) error {
	log.Println("BiServer Start.")
	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		for {
			biReq, err := biServer.Recv()
			if err != nil {
				if err == io.EOF {
					log.Printf("[INFO] recv end")
				}
				break
			} else {
				log.Println("Bi Req: " + biReq.Data)
			}
		}
		wg.Done()
	}()

	go func() {
		for {
			err := biServer.Send(&proto.MsgReply{Data: "ok"})
			if err != nil {
				break
			} else {
				log.Println("Bi Res: ok")
				time.Sleep(time.Second)
			}
		}
		wg.Done()
	}()

	wg.Wait()
	log.Println("BiServer Done.")
	return nil
}

启动服务:

func main() {
	//监听端口
	lis, err := net.Listen("tcp", ":5902")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	
	//创建一个grpc 服务器
	s := grpc.NewServer()
	//注册事件
	proto.RegisterStreamServer(s, &server{})
	
	// 注册服务端反射服务
	// reflection.Register(s)
	
	//处理链接
	s.Serve(lis)
}

客户端

调用对应接口成功后,会返回对应流对象:

func main() {
	//新建grpc连接
	grpcConn, err := grpc.Dial(":5902", grpc.WithInsecure())
	if err != nil {
		log.Fatalln(err)
	}
	defer grpcConn.Close()

	//通过连接 生成一个client对象。
	c := proto.NewStreamClient(grpcConn)

	//设置超时
	//ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	//defer cancel()
	ctx := context.Background()

	
	//调用服务端推送流,获取服务端流数据
	log.Println("GetStream:")
	getClient, err := c.GetStream(ctx, &proto.MsgRequest{Data: "Get Time"})
	if err != nil {
		log.Fatalln(err)
		return
	}
	for {
		aa, err := getClient.Recv()
		if err != nil {
			if err == io.EOF {
				log.Printf("[INFO] recv end")
			}
			break
		}
		log.Println("Get Res Data: " + aa.Data)
	}

	
	//客户端推送流
	log.Println("PutStream:")
	putClient, err := c.PutStream(ctx)
	if err != nil {
		log.Fatalln(err)
		return
	}
	i := 1
	for i < 4 {
		i++
		var putData = proto.MsgRequest{Data: "Put " + strconv.Itoa(i) + " "}
		log.Println("Put Req Data: " + putData.Data)
		putClient.Send(&putData)
		time.Sleep(time.Second)
	}
	putRes, err := putClient.CloseAndRecv()
	if err != nil {
		log.Fatalln(err)
	}
	log.Printf("Put Done. Res is %v", putRes.Data)

	
	//双向流
	log.Println("BiStream:")
	//设置结束等待
	done := make(chan struct{})
	biClient, err := c.BiStream(ctx)
	if err != nil {
		log.Fatalln(err)
		return
	}
	go func() {
		for {
			biRes, err := biClient.Recv()
			if err != nil {
				return
			} else {
				log.Println("Bi Res Data: " + biRes.Data)
			}
		}
	}()

	go func() {
		i := 1
		for i < 4 {
			i++
			biReq := proto.MsgRequest{Data: "send " + strconv.Itoa(i) + " "}
			log.Println("Bi Req Data: " + biReq.Data)
			biClient.Send(&biReq)
			time.Sleep(time.Second)
		}
        biClient.CloseSend()
		done <- struct{}{}
	}()

	<-done
	log.Println("All Done.")
}

proto3

proto3用于构建 protocol buffer 数据:包括 .proto 文件语法和如何基于该 .proto 文件生成数据访问类。

所有数据结构以message来定义:

  • 每个字段必须指定具体类型(可以是简单类型,也可以是message);
  • 每个字段都需分配唯一的标识编号,投入到使用后不应该被更改:
    • 对于1~15编号的字段,只需要一个字节编码;
    • 对于16~2047编号的字段,则需要两个字节编码;
    • 字段号的范围为:1~536870911(2^29-1)。其中19000~19999为ProtocolBuffer自己预留(reserved)的字段号不能使用。
  • 字段可以使用两种规则描述:
    • 单一的(singular):0个或1个,不用在字段定义中指出;
    • 重复的(repeated):0个到多个,需要在字段定义中指出。
  • 默认值,消息被解析时,若字段并没有被赋值,将会被设为默认值:
    • string:空串
    • bytes:空的bytes序列
    • bool:false
    • 数字类型:0
    • 枚举类型:默认值为枚举类型中定义的第一个值,也就是0
    • 消息类型(message):取决于所编译的语言; 对于repeated,为空的list。

数据类型

消息标量字段可以是以下类型之一:

.protoC++JavaPythonGoC#Notes
doubledoubledoublefloatfloat64double
floatfloatfloatfloatfloat32float
int32int32intintint32int使用变长编码,对于负值的效率很低,如果你的域有可能有负值,请使用sint64替代
uint32uint32intint/longuint32uint使用变长编码
uint64uint64longint/longuint64ulong使用变长编码
sint32int32intintint32int使用变长编码,这些编码在负值时比int32高效的多
sint64int64longint/longint64long使用变长编码,有符号的整型值。编码时比通常的int64高效。
fixed32uint32intintuint32uint总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效。
fixed64uint64longint/longuint64ulong总是8个字节,如果数值总是比总是比256大的话,这个类型会比uint64高效。
sfixed32int32intintint32int总是4个字节
sfixed64int64longint/longint64long总是8个字节
boolboolbooleanboolboolbool
stringstringStringstr/unicodestringstring一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。
bytesstringByteStringstr[]byteByteString可能包含任意顺序的字节数据。
 类似资料: