双向流式RPC即客户端和服务端均为流式的RPC,能发送多个请求对象也能接收到多个响应对象。典型应用示例:聊天应用等。
我们这里还是编写一个客户端和服务端进行人机对话的双向流式RPC示例。
1.定义服务
// 双向流式数据
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
修改.proto文件后,需要重新使用 protocol buffers编译器生成客户端和服务端代码。
2.服务端实现BidiHello方法。
func (s * server) BidHello(stream pb.Greeter_BidiHelloServer)error{
for {
//接收流式请求
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
reply := magic(in.GetName()) // 对收到的数据做些处理
// 返回流式响应
if err := stream.Send(&pb.HelloResponse{Reply: reply}); err != nil {
return err
}
}
}
这里我们还定义了一个处理数据的magic函数,其内容如下。
// magic 一段价值连城的“人工智能”代码
func magic(s string) string {
s = strings.ReplaceAll(s, "吗", "")
s = strings.ReplaceAll(s, "吧", "")
s = strings.ReplaceAll(s, "你", "我")
s = strings.ReplaceAll(s, "?", "!")
s = strings.ReplaceAll(s, "?", "!")
return s
}
3.客户端调用BidiHello方法,一边从终端获取输入的请求数据发送至服务端,一边从服务端接收流式响应。
func runBidHello(c pb.GreeterClient){
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// 双向流模式
stream, err := c.BidiHello(ctx)
if err != nil {
log.Fatalf("c.BidiHello failed, err: %v", err)
}
waitc := make(chan struct{})
go func() {
for {
// 接收服务端返回的响应
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("c.BidiHello stream.Recv() failed, err: %v", err)
}
fmt.Printf("AI:%s\n", in.GetReply())
}
}()
// 从标准输入获取用户输入
reader := bufio.NewReader(os.Stdin) // 从标准输入生成读对象
for {
cmd, _ := reader.ReadString('\n') // 读到换行
cmd = strings.TrimSpace(cmd)
if len(cmd) == 0 {
continue
}
if strings.ToUpper(cmd) == "QUIT" {
break
}
// 将获取到的数据发送至服务端
if err := stream.Send(&pb.HelloRequest{Name: cmd}); err != nil {
log.Fatalf("c.BidiHello stream.Send(%v) failed: %v", cmd, err)
}
}
stream.CloseSend()
<-waitc
}
流式RPC调用metadata示例
这里以双向流式RPC为例演示客户端和服务端如何进行metadata操作。
client端的metadata操作
下面的代码片段演示了client端在服务端流式RPC模式下如何设置和获取metadata。
// bidirectionalWithMetadata 流式RPC调用客户端metadata操作
func bidirectionalWithMetadata(c pb.GreeterClient, name string) {
// 创建metadata和context.
md := metadata.Pairs("token", "app-test-q1mi")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 使用带有metadata的context执行RPC调用.
stream, err := c.BidiHello(ctx)
if err != nil {
log.Fatalf("failed to call BidiHello: %v\n", err)
}
go func() {
// 当header到达时读取header.
header, err := stream.Header()
if err != nil {
log.Fatalf("failed to get header from stream: %v", err)
}
// 从返回响应的header中读取数据.
if l, ok := header["location"]; ok {
fmt.Printf("location from header:\n")
for i, e := range l {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Println("location expected but doesn't exist in header")
return
}
// 发送所有的请求数据到server.
for i := 0; i < 5; i++ {
if err := stream.Send(&pb.HelloRequest{Name: name}); err != nil {
log.Fatalf("failed to send streaming: %v\n", err)
}
}
stream.CloseSend()
}()
// 读取所有的响应.
var rpcStatus error
fmt.Printf("got response:\n")
for {
r, err := stream.Recv()
if err != nil {
rpcStatus = err
break
}
fmt.Printf(" - %s\n", r.Reply)
}
if rpcStatus != io.EOF {
log.Printf("failed to finish server streaming: %v", rpcStatus)
return
}
// 当RPC结束时读取trailer
trailer := stream.Trailer()
// 从返回响应的trailer中读取metadata.
if t, ok := trailer["timestamp"]; ok {
fmt.Printf("timestamp from trailer:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Printf("timestamp expected but doesn't exist in trailer")
}
}
server端的metadata操作
下面的代码片段演示了server端在服务端流式RPC模式下设置和操作metadata。
// BidirectionalStreamingSayHello 流式RPC调用客户端metadata操作
func (s *server) BidirectionalStreamingSayHello(stream pb.Greeter_BidiHelloServer) error {
// 在defer中创建trailer记录函数的返回时间.
defer func() {
trailer := metadata.Pairs("timestamp", strconv.Itoa(int(time.Now().Unix())))
stream.SetTrailer(trailer)
}()
// 从client读取metadata.
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return status.Errorf(codes.DataLoss, "BidirectionalStreamingSayHello: failed to get metadata")
}
if t, ok := md["token"]; ok {
fmt.Printf("token from metadata:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
}
// 创建和发送header.
header := metadata.New(map[string]string{"location": "X2Q"})
stream.SendHeader(header)
// 读取请求数据发送响应数据.
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
fmt.Printf("request received %v, sending reply\n", in)
if err := stream.Send(&pb.HelloResponse{Reply: in.Name}); err != nil {
return err
}
}
}