从 https://github.com/google/protobuf/releases下载预编译的“protoc编译器”,用于生成gRPC服务代码。
解压zip文件,并将protoc二进制文件所在bin目录添加到PATH环境变量中。
安装好golang后,设定环境:
go env -w GO111MODULE=auto
go env -w GOPROXY=https://goproxy.cn
安装proto-go,命令如下:
go get -u github.com/golang/protobuf/proto
go get -u github.com/golang/protobuf/protoc-gen-go
因被墙,可无法直接安装:
git clone https://github.com/golang/protobuf.git
# 进入protobuf/protoc-gen-go目录
go build
go install
# 此时,在$GOPATH下bin目录下会有protoc-gen-go (把此目录添加到PATH环境变量)
要生成grpc,有三步:
// 在proto文件中,需要设定包名
option go_package = "./;mytest"; //路径和包名
# 编写好后,通过protoc生成对应代码():
./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto
命令参数说明:
-I
:指定proto文件存放目录;若文件不在当前目录下,则是必须的;--go_out=plugins=grpc:
:指定生成go的代码,并且冒号后指定go代码的存放目录;gRPC 里客户端应用可以像调用本地对象一样直接调用远端服务上的应用方法,使得创建分布式应用和服务更简单。其基于以下理念:
先定义个简单的请求与相应示例
syntax = "proto3";
package service;
option go_package = "./;product";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
把示例文件放在proto目录下,执行protoc命令:
./protoc --go_out=plugins=grpc:proto -I proto/ ./proto/product.proto
会生成product.pb.go文件,里面包含服务端与客户端的代码。
要实现proto中的服务,需要实现GreeterServer接口:
type GreeterServer interface {
SayHello(context.Context, *HelloRequest) (*HelloReply, error)
}
先实现一个对应服务:
type server struct {
}
func (s *server) SayHello(ctx context.Context, in *product.HelloRequest) (*product.HelloReply, error) {
log.Println("Input:", in)
return &product.HelloReply{Message: "Hello " + in.Name}, nil
}
然后创建gRPC服务:
func main() {
// 创建 Tcp 连接
listen, err := net.Listen("tcp", ":5901")
if err != nil {
log.Println("Listen failed:", err)
return
}
rpcServer := grpc.NewServer()
// 注册服务实现者
// 此函数在.pb.go中,自动生成
product.RegisterGreeterServer(rpcServer, &server{})
// 在 gRPC 服务上注册反射服务
// 以便可通过grpcurl工具来查询服务列表或调用grpc方法
// reflection.Register(grpcServer)
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("Start RPC Server fail: %v", err)
}
}
客户端可通过Dial方便地连接,若要做一些参数配置,则需要通过DialContext:
func main() {
//conn, err := grpc.Dial(":5901", grpc.WithInsecure())
conn, err := grpc.DialContext(context.Background(), ":5901",
grpc.WithInsecure(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second, // send pings every 10 seconds if there is no activity
Timeout: 2 * time.Second, // wait 2 second for ping ack before considering the connection dead
PermitWithoutStream: true, // send pings even without active streams
}))
if err != nil {
log.Println("Connect fail:", err)
//return
}
defer conn.Close()
count := 0
client := product.NewGreeterClient(conn)
for {
time.Sleep(5 * time.Second)
count++
log.Println("Try", count)
reply, err := client.SayHello(context.Background(), &product.HelloRequest{Name: "Mike" + strconv.Itoa(count)})
if err != nil {
log.Println("Call fail:", err)
continue
}
log.Println("Reply:", reply)
}
}
相比于简单gRPC,流式(stream)gRPC实现发送/接收大量数据,或不断传输数据的场景。可分为:
return nil
代表响应完成;err == io.EOF
判断服务端是否响应完成;CloseAndRecv
**关闭stream 并接收服务端响应;err == io.EOF
判断客户端是否发送完毕,完毕后使用**SendAndClose
**关闭 stream并返回响应;CloseSend
**关闭流;通过err == io.EOF
判断服务端是否响应完成(接收完数据);return nil
表示已经完成响应;定义数据结构:
syntax = "proto3";
option go_package = ".;proto";
//三个流式rpc
//GetStream服务器返回流
//PutStream客户端上传流
//DiStream双向流
service Stream{
rpc GetStream(MsgRequest)returns(stream MsgReply){}
rpc PutStream(stream MsgRequest)returns(MsgReply){}
rpc BiStream(stream MsgRequest)returns(stream MsgReply){}
}
message MsgRequest {
string data = 1;
}
message MsgReply {
string data = 1;
}
定义服务,实现对应流接口:
type server struct {
}
//服务端->客户端 单向流
func (*server) GetStream(req *proto.MsgRequest, getServer proto.Stream_GetStreamServer) error {
log.Println("GetServer Start.")
i := 0
for i < 10 {
i++
getServer.Send(&proto.MsgReply{Data: req.Data + ":" + fmt.Sprintf("%v", time.Now().Unix())})
log.Println("Get Res Send.")
time.Sleep(1 * time.Second)
}
log.Println("GetServer Start.")
return nil
}
//客户端->服务端 单向流
func (*server) PutStream(putServer proto.Stream_PutStreamServer) error {
log.Println("PutServer Start.")
var cliStr strings.Builder
for {
if putReq, err := putServer.Recv(); err == nil {
log.Println("Put Req: " + putReq.Data)
cliStr.WriteString(putReq.Data)
} else {
putServer.SendAndClose(&proto.MsgReply{Data: "Finish. Your Data is: " + cliStr.String()})
break
}
}
log.Println("PutServer Done.")
return nil
}
//双向流
func (*server) BiStream(biServer proto.Stream_BiStreamServer) error {
log.Println("BiServer Start.")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
for {
biReq, err := biServer.Recv()
if err != nil {
if err == io.EOF {
log.Printf("[INFO] recv end")
}
break
} else {
log.Println("Bi Req: " + biReq.Data)
}
}
wg.Done()
}()
go func() {
for {
err := biServer.Send(&proto.MsgReply{Data: "ok"})
if err != nil {
break
} else {
log.Println("Bi Res: ok")
time.Sleep(time.Second)
}
}
wg.Done()
}()
wg.Wait()
log.Println("BiServer Done.")
return nil
}
启动服务:
func main() {
//监听端口
lis, err := net.Listen("tcp", ":5902")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
//创建一个grpc 服务器
s := grpc.NewServer()
//注册事件
proto.RegisterStreamServer(s, &server{})
// 注册服务端反射服务
// reflection.Register(s)
//处理链接
s.Serve(lis)
}
调用对应接口成功后,会返回对应流对象:
func main() {
//新建grpc连接
grpcConn, err := grpc.Dial(":5902", grpc.WithInsecure())
if err != nil {
log.Fatalln(err)
}
defer grpcConn.Close()
//通过连接 生成一个client对象。
c := proto.NewStreamClient(grpcConn)
//设置超时
//ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
//defer cancel()
ctx := context.Background()
//调用服务端推送流,获取服务端流数据
log.Println("GetStream:")
getClient, err := c.GetStream(ctx, &proto.MsgRequest{Data: "Get Time"})
if err != nil {
log.Fatalln(err)
return
}
for {
aa, err := getClient.Recv()
if err != nil {
if err == io.EOF {
log.Printf("[INFO] recv end")
}
break
}
log.Println("Get Res Data: " + aa.Data)
}
//客户端推送流
log.Println("PutStream:")
putClient, err := c.PutStream(ctx)
if err != nil {
log.Fatalln(err)
return
}
i := 1
for i < 4 {
i++
var putData = proto.MsgRequest{Data: "Put " + strconv.Itoa(i) + " "}
log.Println("Put Req Data: " + putData.Data)
putClient.Send(&putData)
time.Sleep(time.Second)
}
putRes, err := putClient.CloseAndRecv()
if err != nil {
log.Fatalln(err)
}
log.Printf("Put Done. Res is %v", putRes.Data)
//双向流
log.Println("BiStream:")
//设置结束等待
done := make(chan struct{})
biClient, err := c.BiStream(ctx)
if err != nil {
log.Fatalln(err)
return
}
go func() {
for {
biRes, err := biClient.Recv()
if err != nil {
return
} else {
log.Println("Bi Res Data: " + biRes.Data)
}
}
}()
go func() {
i := 1
for i < 4 {
i++
biReq := proto.MsgRequest{Data: "send " + strconv.Itoa(i) + " "}
log.Println("Bi Req Data: " + biReq.Data)
biClient.Send(&biReq)
time.Sleep(time.Second)
}
biClient.CloseSend()
done <- struct{}{}
}()
<-done
log.Println("All Done.")
}
proto3用于构建 protocol buffer 数据:包括 .proto 文件语法和如何基于该 .proto 文件生成数据访问类。
所有数据结构以message来定义:
1~15
编号的字段,只需要一个字节编码;16~2047
编号的字段,则需要两个字节编码;1~536870911(2^29-1)
。其中19000~19999
为ProtocolBuffer自己预留(reserved)的字段号不能使用。消息标量字段可以是以下类型之一:
.proto | C++ | Java | Python | Go | C# | Notes |
---|---|---|---|---|---|---|
double | double | double | float | float64 | double | |
float | float | float | float | float32 | float | |
int32 | int32 | int | int | int32 | int | 使用变长编码,对于负值的效率很低,如果你的域有可能有负值,请使用sint64替代 |
uint32 | uint32 | int | int/long | uint32 | uint | 使用变长编码 |
uint64 | uint64 | long | int/long | uint64 | ulong | 使用变长编码 |
sint32 | int32 | int | int | int32 | int | 使用变长编码,这些编码在负值时比int32高效的多 |
sint64 | int64 | long | int/long | int64 | long | 使用变长编码,有符号的整型值。编码时比通常的int64高效。 |
fixed32 | uint32 | int | int | uint32 | uint | 总是4个字节,如果数值总是比总是比228大的话,这个类型会比uint32高效。 |
fixed64 | uint64 | long | int/long | uint64 | ulong | 总是8个字节,如果数值总是比总是比256大的话,这个类型会比uint64高效。 |
sfixed32 | int32 | int | int | int32 | int | 总是4个字节 |
sfixed64 | int64 | long | int/long | int64 | long | 总是8个字节 |
bool | bool | boolean | bool | bool | bool | |
string | string | String | str/unicode | string | string | 一个字符串必须是UTF-8编码或者7-bit ASCII编码的文本。 |
bytes | string | ByteString | str | []byte | ByteString | 可能包含任意顺序的字节数据。 |