当前位置: 首页 > 工具软件 > 流式RPC > 使用案例 >

什么?你还不了解gRPC,我不允许,开始启动你的RPC服务吧(2)

张德佑
2023-12-01

1 安装gRPC

在之前创建的项目根目录下,命令行执行gRPC库安装命令

go get -u google.golang.org/grpc@v1.29.1

2 gRPC的调用方式

包含四种调用方似乎

  • 一元RPC
  • 服务端流式RPC
  • 客户端流式RPC
  • 双向流式RPC

不同的调用方式有不同的应用场景,下面了解不同调用方式的实现和使用场景

下面约定代码实现方式

import (
...
//设置引用别名
pb "github.com/go-programming-tour-book/grpc-demo/proto"
)

var port string

func init() {
	flag.StringVar(&port, "p", "8000", "启动端口号")
	flag.Parse()
}

对应的server下的server.go和client下的client.go都是这么写的,需要注意的是package都是main,只有这样我们的main方法才可以被调用

3 一元RPC

又叫做单次RPC,简单来说就是客户端发送一次普通的RPC请求,单次的请求,有来有回

  • proto
service Greeter {
        rpc SayHello (HelloRequest) returns (HelloReply) {}
}
  • Server


type GreeterServer struct {}



func (s *GreeterServer) SayHello(ctx content.Content, r *pb.HelloRequest)
(*pb.HelloReply, error) {
        return &pb.HelloReply {Message : "hello.world"}, nil
}

func main() {
	//创建gRPC Server对象,可以把它理解为Server端的抽象对象
        server := grpc.NewServer()
    //将GreeterServer注册到gRPC Server的内部注册中心,这样在接收请求的时候,即可通过内部的服务发现发现该服务端接口
    //并且进行逻辑处理
        pb.RegisterGreeterServer(server, &GreeterServer{})
        //创建Listen,监听TCP端口
        lis, _ := net.Listen("tcp", ":"+port)
        //gRPC Server开始lis Accpet,直到stop或者gracefulStop
        server.Serve(lis)
}
  • Client
func main() {
		//创建于给定目标(服务端)的连接句柄
        conn, _ := grpc.Dial(":"+port, grpc.WithInsecure())
        defer conn.Close()
		//创建Greeter的客户端对象
        client := pb.NewGreeterClient(conn)
        //发送RPC请求,等待同步响应,得到回调以后返回响应结果
        _ = SayHello(client)
}

func SayHello (client pb.GreeterClient) error {
        resp, _ := client.SayHello(context.Background(), &pb.HelloRequest{Name:"eddycjy"})
        log.Printf("client.SayHello resp: %s", resp.Message)
        return nil
}

4 服务端流式RPC

服务端流式RPC是一个单向流,Server为Stream,Client为普通的一元RPC请求

客户端发起一次简单的RPC请求,服务端通过流式响应多次发送数据集,客户端接受数据集

  • Proto
 rpc SayList (HelloRequest) returns (stream HelloReply) {}
  • Sever
func (s *GreeterServer) SayList(r *pb.HelloRequest, stream pb.Greeter_SayListServer) error {
        for n := 0; n <= 6; n++ {
        _ = stream.Send(&pb.HelloReply{Message: "hello.list"})
        }
        return nil
}

在Server端,需要重点留意stream.Send方法,通过阅读源码,在得知protoc在生成的时候,根据定义生成了各式各样的符合标准的接口方法,最后在统一调用内部的SendMsg方法,该方法涉及以下的过程

  • 消息体序列化
  • 压缩序列化后的消息体
  • 为正在传输的消息体增加5字节的header标志位
  • 判断压缩+序列化后的消息体总字节长度是否大于预设值,超出报错
  • 写入给流的数据集

  • Client

func SayList(client pb.GreeterClient, r *pb.HelloRequest) error {
        stream, _ := client.SayList(context.Background(), r)
        for {
        resp, err := stream.Recv()
        if err == io.EOF {
        break
        }

        if err != nil {
        return err
        }

        log.Printf("resp: %v", resp)
        }
        return nil
}

5 客户端流式RPC

客户端流式RPC,也是一个单向流,客户端通过流式发起多次RPC请求给服务端,而服务端仅仅发起一次响应给客户端

  • proto
rpc SayRecord (stream HelloRequest) returns (HelloReply) {}
  • Server
func (s *GreeterServer) SayRecord(stream pb.Greeter_SayRecordServer) error {
        for {
        resp, err := stream.Recv()
        if err == io.EOF {
        message := &pb.HelloReply{Message:"say.record"}
        return stream.SendAndClose(message)
        }

        if err != nil {
        return err
        }

        log.Printf("resp: %v", resp)
        
        }

        return nil
}

对每一个Recv都进行了处理,当io流关闭的时候,需要通过stream.SendAndClose方法将最终的响应结果发送给客户端,同时关闭在另外一侧等待的Recv

  • Client
func SayRecord(client pb.GreeterClient, r *pb.HelloRequest) erro {
        stream, _ := client.SayRecord(context.Background())
        for n := 0; n <6 ; n++ {
        _ = stream.Send(r)
        }
        resp, _ := stream.CloseAndRecv()

        log.Printf("resp err: %v", resp)
        return nil
}

Server端的stream.SendAndClose方法与Client端的stream.CloseAndRecv方法是配套使用的

6 双向流式RPC

 类似资料: