当前位置: 首页 > 知识库问答 >
问题:

Golang grpc ServerStream引发错误:传输:流已完成或已调用WriteHeader

吴安和
2023-03-14

我的grpc服务器流函数出现了一个有趣的错误,我快崩溃了。从grpc godoc或其他在线阅读中无法找到任何可能的原因。希望更熟悉Go和grpc流的人能够为我指明正确的方向。

我的实现尝试遵循grpc.io网站上的基本服务器流示例。

有问题的protobuf定义:

service ArrayBasedCache {
  ...
  rpc GetRecord (GetRecordRequest) returns (stream MessageResponse) {}
}

message GetRecordRequest {
  string key = 1;
}

message MessageResponse {
  string message = 1;
}

func (ctlr *cacheClientController) GetRecord(req *svcgrpc.GetRecordRequest, stream svcgrpc.ArrayBasedCache_GetRecordServer) error {
    key := req.GetKey()
    if ctlr.inputChannels[key] == nil {
        return errors.New("Requested record has expired")
    }
    msgs, e1 := ctlr.client.ReadArrayRecord(key)
    if e1 != nil {
        panic(e1)
    }
    log.Printf("Messages: %v", msgs)
    for i := 0; i < len(msgs); i++ {
        log.Printf("trying to write message: %v", msgs[i])
        if e2 := stream.Send(&svcgrpc.MessageResponse{Message: msgs[i]}); e2 != nil {
            log.Printf("Writing message %d of %d to stream failed", i+1, len(msgs))
            panic(e2)
        }
    }
    return nil
}

func (s *GrpcService) GetRecord(key string) (svcgrpc.ArrayBasedCache_GetRecordClient, error) {
    req := &svcgrpc.GetRecordRequest{Key: key}
    ctx, cancelFunc := context.WithTimeout(context.Background(), defaultTimeout)
    defer cancelFunc()
    resp, err := s.grpcClient.GetRecord(ctx, req)
    if err != nil {
        return nil, err
    }
    return resp, nil
}

func (s *GrpcService) StreamToArray(stream svcgrpc.ArrayBasedCache_GetRecordClient) ([]string, error) {
    out := []string{}
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return nil, errors.New("Unexpected error reading stream")
        }
        out = append(out, msg.Message)
    }
    return out, nil
}
stream, err := c.GetRecord(testKey)
    if err != nil {
        log.Printf("Retrieving record stream %v failed", testKey)
    }
    arr, err := c.StreamToArray(stream)
    if err != nil {
        log.Printf("Failed to unmarshall message stream to array")
    }
    log.Printf("Retrieved messages %v", arr)
2020/10/26 22:43:36 Messages: [abc def hij]
2020/10/26 22:43:36 trying to write message: abc
2020/10/26 22:43:36 Writing message 1 of 3 to stream failed
panic: rpc error: code = Internal desc = transport: transport: the stream is done or WriteHeader was already called

goroutine 82 [running]:
github.com/TasSM/app/service/controller.(*cacheClientController).GetRecord(0xc0001e4cc0, 0xc000176fc0, 0xd6ef80, 0xc00022e5f0, 0xc0001e4cc0, 0xc00039fa00)
        C:/Users/myuser/Documents/repos/app/src/service/controller/controller.go:96 +0x5ad
github.com/TasSM/app/service/svcgrpc._ArrayBasedCache_GetRecord_Handler(0xc11a80, 0xc0001e4cc0, 0xd6ca00, 0xc0001dc300, 0x11dec30, 0xc000392000)
        C:/Users/myuser/Documents/repos/app/src/service/svcgrpc/cacheservice_grpc.pb.go:192 +0x13d
google.golang.org/grpc.(*Server).processStreamingRPC(0xc0001a7a40, 0xd70300, 0xc000108c00, 0xc000392000, 0xc0001e4de0, 0x119fb00, 0x0, 0x0, 0x0)
        C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:1457 +0x15cb
google.golang.org/grpc.(*Server).handleStream(0xc0001a7a40, 0xd70300, 0xc000108c00, 0xc000392000, 0x0)
        C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:1537 +0x1309
google.golang.org/grpc.(*Server).serveStreams.func1.2(0xc000123140, 0xc0001a7a40, 0xd70300, 0xc000108c00, 0xc000392000)
        C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:871 +0xe0
created by google.golang.org/grpc.(*Server).serveStreams.func1
        C:/Users/myuser/go/pkg/mod/google.golang.org/grpc@v1.33.0/server.go:869 +0x349
exit status 2

共有1个答案

阙繁
2023-03-14

我认为问题出在GetRecordgrpc客户机实现中,特别是您正在使用的上下文实现中。

通过使用context.withtimeout并在同一方法中调用defer cancelFunc(),基本上是在从getrecord方法返回之前关闭流。

如果您希望仍然使用Context.WithTimeout实现,请不要在GetRecord方法中使用CancelFunc,而是返回CancelFunc,或者将CTX传递给GetRecord方法。

 类似资料:
  • 看来是从到达这里的,我也在我的项目中使用它 但这只是一个假设。问题可能出在别的地方。有什么想法吗?

  • 当我尝试将导出的转储文件(特别是我的数据库/模式)导入到其他计算机时,我遇到了这个错误; d:\ CAPSTONE SYSTEM \ mydatabasepstone \ dump 2015 09 22 \ schm _ CAPSTONE SYSTEM _ routines . SQL不包含架构/表信息16:58:55还原schm _ CAPSTONE SYSTEM(employee _ entr

  • 这里是我的AsyncTask方法: 如何调用profileDefaults()?是我的活动。我需要在OnPostExecute之后调用它!

  • 即使我在我的流中使用Supplier并在每次我想检索我的strem并对其执行终端操作时使用Supplier.get(),我仍然得到“Stream已经被操作或关闭”异常。有人能看看我的代码并建议我做错了什么吗? 引发异常的方法: 流供应商:

  • 问题内容: 我收到警告:以下代码行: 它是什么原因以及如何解决? 问题答案: 从任何地方删除,这是不需要的。实际上,我认为您可以删除此代码中的所有位置-完全不需要。 详细说明 PHP允许通过两种方式传递变量:“按值”和“按引用”。第一种方式(“按值”)不能修改,而第二种方式(“按引用”)可以: 注意标志。如果我调用一个变量,它将被修改,如果我调用,则在返回值之后,参数的值将是相同的。 通过执行以下

  • 我已经在以下所有链接上试用了该解决方案:Gradle Build Errors Android Studio分级错误-preDexDebug Android java.exe已完成,退出值为非零%1 已完成,退出值为非零 执行任务失败:“:app:PredexDebug” http://fqa.io/questions/29045129/android-java-exe-finished-with