当前位置: 首页 > 知识库问答 >
问题:

如何取消gRPC bidi流媒体服务器的阻塞Recv()调用?

顾承平
2023-03-14

在Go语言的gRPC中提供双向流时,规范流处理程序如下所示:

func (s *MyServer) MyBidiRPC(stream somepb.MyServer_MyBidiServer) error {
    for {
        data, err := stream.Recv()
        if err == io.EOF {
            return nil // clean close
        }
        if err != nil {
            return err // some other error
        }
        // do things with data here
    }
}

具体来说,当Bidi RPC的处理程序返回时,即考虑服务器端关闭的信号。

这是一个同步编程模型——在等待来自客户端的消息时,服务器在这个goroutine(由grpc库创建)中保持阻塞状态。

现在,我想取消阻止这个Recv()调用(最终在底层grpc上调用RecvMsg()。ServerStream),并返回/关闭流,因为服务器进程已决定使用此客户端完成此操作。

不幸的是,我找不到明显的方法来做到这一点:

  • 为我的服务生成的bidi服务器界面上没有类似Close()或CloseSend()或CloseRecv()或Shutdown()的函数
  • 流中的上下文,我可以通过流获取。Context(),不公开用户可访问的取消功能
  • 对于grpc接受的新连接,我找不到在“起始端”传递上下文的方法。服务器,在那里我可以插入我自己的取消功能

我可以关闭整个grpc。通过调用Stop()调用服务器,但这不是我想要做的——只有这个特定的客户端连接(grpc.ServerStream)应该完成。

我可以向客户端发送一条消息,让客户端反过来关闭连接。但是,如果客户端已经脱离网络,这就不起作用了,这将通过超时来解决,超时必须很长才能保持总体健壮。我现在就想要它,因为我没有耐心,更重要的是,在规模上,悬挂无响应的客户端可能是一个很高的成本。

我可以(或许)翻阅grpc。ServerStream和反射,直到我找到transportStream,然后从中找出cancel函数并调用它。或者在溪流中挖掘。带有html" target="_blank">反射的Context(),并进行我自己的cancel函数引用来调用。对于未来的维护人员来说,这两种方法似乎都不可取。

但这些肯定不是唯一的选择吗?决定不再需要连接特定客户端并不是神奇的太空科学。我如何关闭这个流,以便Recv()从服务器进程端调用un-block,而不涉及到客户端的往返?

共有1个答案

羿宏硕
2023-03-14

不幸的是,我不认为有什么好方法可以满足你的要求。根据你的目标,我认为你有两个选择:

>

  • 在goroutine中运行Recv,并在需要返回时从Bidi处理程序返回。这将关闭上下文并解除屏蔽Recv。这显然是次优的,因为它需要小心,因为您现在有代码在处理程序的执行范围之外执行。然而,这似乎是我能找到的最接近的答案。

    如果您试图通过设置超时来减轻行为不端的客户端的影响,那么您可能可以将这项工作转移到具有KeepaliveEnforcementPolicy和/或KeepaliveParams的框架中。如果这与您希望关闭连接的原因一致,那么这可能是更好的选择,但在其他方面没有多大用处。

  •  类似资料:
    • 现在我们已经知道了Java NIO里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞IO相对传统的阻塞IO给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。 查找关于设计非阻塞服务的相关资料是比较难的,本文提出的解决方案也只能是基于笔者个人的工作经验,构思。如果你有其他的解决方案或者是更好的点子,那么

    • 主要内容:1 非阻塞服务器-GitHub仓库,2 无阻塞IO管道,3 非阻塞与阻塞IO管道,4 基本的无阻塞IO管道设计,5 读取部分消息,6 存储部分消息,7 编写部分消息,8 总结,9 服务器线程模型即使你了解了Java NIO非阻塞功能如何工作(Selector,Channel, Buffer等),设计一个无阻塞服务器仍然很难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。 本教程中描述的思想是围绕Java NIO

    • 我想使用NATs流式服务器来流式传输数据,并使用Flink来处理数据。 如何使用apache flink使用NATS流媒体服务器处理实时流媒体数据?

    • 本文向大家介绍Dubbo服务之间的调用是阻塞的吗?相关面试题,主要包含被问及Dubbo服务之间的调用是阻塞的吗?时的应答技巧和注意事项,需要的朋友参考一下 默认是同步等待结果阻塞的,支持异步调用。 Dubbo 是基于 NIO 的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小,异步调用会返回一个 Future 对象。 异步调用流程图如下。

    • 我有一个方法,可以启动一个简单的cpp grpc服务器。 我想做

    • 我已经下载了ant media server文件,并尝试启动服务器<但是,我最近了解到Ant Media Server需要java 11 所以我将java版本更新为java 11: 所以当我运行这个命令时: java版本 结果是: openjdk版本“11.0.11-ea”2021-04-20 OpenJDK运行时环境(build 11.0.11-ea 4-Ubuntu-0ubuntu3.16.0