当前位置: 首页 > 工具软件 > grpc-go > 使用案例 >

gRPC在Go中的使用

谈渊
2023-12-01

环境
环境的搭建此处省略一万句,百度上有无数资料

开始使用

  • 编写proto文件
    什么是proto文件?proto文件来预先定义的消息格式。数据包是按照proto文件所定义的消息格式完成二进制码流的编码和解码,简单来说可以类比为API文档,请求与返回的结构,工作流模式都是事先定义好的。实例如下,我们编写文件rpc.proto
syntax = "proto3";

option go_package = "./;pb";

service Greeter {
    // 定义方法,stream是常量,流模式
    rpc ServerStream (StreamRequestData) returns (stream StreamResponseData);      //服务端流模式,拉消息
    rpc ClientStream (stream StreamRequestData) returns (StreamResponseData);      //客户端流模式,推消息
    rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData);  //双向流模式,能推能拉
    rpc OneStream (StreamRequestData) returns (StreamResponseData); //普通流模式
}

message StreamRequestData {
    string data = 1; //编号
}
message StreamResponseData {
    string data = 1; //编号
}

syntax:指定使用的Protobuf协议版本,类似于在写python代码时开头加上python2/3
option go_package:生成的代码放入指定的package中,这里使用的是Go,所以是go_package
service:用于定义方法,结构内部为rpc + 函数名 (形参) + returns + (返回结构),在参数中有关键字 Stream,在参数前加入此关键字决定了该函数的工作流模式
message:声明了消息体,内部结构可以是基础的类型,也可以再包含一个message作复合类型,每个参数后面的数字代表了序号,这决定了在Protobuf数据包的序列化、反序列化时,该字段的具体排序。

工作流模式:普通流模式客户端流模式,服务端流模式,双向流模式
(以下用粗糙的场景进行类比,方便理解但不一定合适)
普通流模式:类似用调用自己代码里的函数,亦或者一个简单个http请求,带着参数发送请求,得到结果
客户端流模式:由客户端不断给服务端发送消息流,由服务端的逻辑来决定什么时候响应客户端
服务端流模式:在客户端发起请求后,服务端不断的给客户端推消息流,类似于视频预览的请求,由客户端发起预览请求,服务端不断的推送视频数据(当然gRPC并不适用于视频的解析)
双向流模式:类似聊天室,websocket,互相发送消息,一个全双工的通信

生成代码

使用protoc命令生成gRPC代码,例如

protoc --go_out=. --go-grpc_out=. rpc.proto

xxx_out后指定的是路径,路径为当前终端打开位置的相对路径,这里有一个,指定路径时./要与文件名之间有一个空格!!!

根据以上可以生成rpc.pb.gorpc_grpc.pb.go两个文件,到此可以开始编写服务端与客户端的代码了
生成的代码IDE有可能会报错,升级一下IDE或者proto插件版本即可解决

编写逻辑代码

  • 客户端创建连接
	//创建连接,使用忽略安全的模式,绑定端口8080
	conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err.Error())
	}

	//关闭连接
	defer conn.Close()

	//创建客户端
	rpc := pb.NewGreeterClient(conn)

	//到此可以使用创建的客户端对象rpc来调用方法
	//rpc.ServerStream(xxxxxx)

各个方法的传参可以到生成的rpc_grpc.pb.go中查看

type GreeterClient interface {
	// 定义方法,stream是常量,流模式
	ServerStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (Greeter_ServerStreamClient, error)
	ClientStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_ClientStreamClient, error)
	AllStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_AllStreamClient, error)
	OneStream(ctx context.Context, in *StreamRequestData, opts ...grpc.CallOption) (*StreamResponseData, error)
}
  • 服务端监听端口
type GreeterServer struct {
}
//例子,在rpc_grpc.pb.go的interface中声明的接口此处要全部实现
func (this *GreeterServer) ClientStream(cli pb.Greeter_ClientStreamServer) error {
}

func main() {
	g := grpc.NewServer()
	pb.RegisterGreeterServer(g, new(GreeterServer))
	list, _ := net.Listen("tcp", "127.0.0.1:8080")
	g.Serve(list)
}

接口中函数的声明,其形参和返回参数可以到rpc_grpc.pb.go文件中查看

type GreeterServer interface {
	// 定义方法,stream是常量,流模式
	ServerStream(*StreamRequestData, Greeter_ServerStreamServer) error
	ClientStream(Greeter_ClientStreamServer) error
	AllStream(Greeter_AllStreamServer) error
	OneStream(context.Context, *StreamRequestData) (*StreamResponseData, error)
	MustEmbedUnimplementedGreeterServer() //这里不知道为什么生成的时候是小写的,又很蛋疼的必须实现,手动改为大写
}
  • 单向流模式
    客户端
	params := new(pb.StreamRequestData)
	params.Data = "hello"
	ctr, _ := rpc.OneStream(context.Background(), params)

服务端

func (this *GreeterServer) OneStream(ctx context.Context, req *pb.StreamRequestData) (*pb.StreamResponseData, error) {
	fmt.Println(req.GetData())
	return &pb.StreamResponseData{
		Data: "heiheihei",
	}, nil
}
  • 客户端流模式
    客户端

    puts, _ := rpc.ClientStream(context.Background())
    	for i := 0; i < 10; i++ {
    	//循环向服务端发送消息
    		puts.Send(&pb.StreamRequestData{
    			Data: "xxx",
    		})
    	}
    

    服务端

    func (this *GreeterServer) ClientStream(cli pb.Greeter_ClientStreamServer) error {
    	for {
    		a, err := cli.Recv()
    		if err != nil {
    			fmt.Println(err)
    			return nil
    		}
    		fmt.Println(a)
    	}
    	return nil
    }
    
  • 服务端端流模式
    客户端

    	params := new(pb.StreamRequestData)
    	params.Data = "xxx"
    	rec, _ := rpc.ServerStream(context.Background(), params)
    	for {
    		//循环接收服务端的消息
    		data, _ := rec.Recv()
    		fmt.Println(data.GetData())
    	}
    

    服务端

    	func (this *GreeterServer) ServerStream(req *pb.StreamRequestData, rep pb.Greeter_ServerStreamServer) error {
    	if req.GetData() == "xxx" {
    		for {
    			params := new(pb.StreamResponseData)
    			params.Data = "heiheihei"
    			//循环向客户端发送消息
    			rep.Send(params)
    		}
    	}
    	return nil
    }
    
  • 双向流模式
    客户端

    	allStr, _ := rpc.AllStream(context.Background())
    	wg := sync.WaitGroup{}
    	wg.Add(1)
    	//接受服务端消息的协程
    	go func() {
    		defer wg.Done()
    		for {
    			//业务代码
    			res, err := allStr.Recv()
    			if err != nil {
    				fmt.Println("服务端流数据发送over:", err)
    				break
    			}
    			fmt.Println("服务端消息:", res.Data)
    		}
    	}()
    	//发送消息给服务端的协程
    	go func() {
    		defer wg.Done()
    		i := 0
    		for {
    			i++
    			//业务代码
    			_ = allStr.Send(&pb.StreamRequestData{
    				Data: fmt.Sprintf("hello"),
    			})
    			time.Sleep(time.Second * 1)
    			if i > 10 {
    				break
    			}
    		}
    	}()
    	wg.Wait()
    

    服务端

    		func (this *GreeterServer) AllStream(allStr pb.Greeter_AllStreamServer) error {
    	wg := sync.WaitGroup{}
    	wg.Add(2)
    	//接受客户端消息的协程
    	go func() {
    		defer wg.Done()
    		for {
    			//业务代码
    			res, err := allStr.Recv()
    			if err != nil {
    				fmt.Println("客户端流数据over:", err)
    				break
    			}
    			fmt.Println("收到客户端消息:", res.Data)
    		}
    	}()
    	//发送消息给客户端的协程
    	go func() {
    		defer wg.Done()
    		i := 0
    		for {
    			i++
    			//业务代码
    			_ = allStr.Send(&pb.StreamResponseData{
    				Data: fmt.Sprintf("world"),
    			})
    			time.Sleep(time.Second * 1)
    			if i > 10 {
    				break
    			}
    		}
    	}()
    	wg.Wait()
    	return nil
    }
    	}
    
 类似资料: