当前位置: 首页 > 知识库问答 >
问题:

GRPC:保留对流的引用,以将数据发送到多个客户端

施喜
2023-03-14

我从使用go的GRPC开始。我阅读了官方文档,并举了几个例子。

在大多数示例中,您不识别客户端,而是使用流读取/写入数据。我看到上下文中有用于检索身份验证信息的应用编程接口,并且可以为聊天请求识别客户端。但是,如果我想根据客户端标识保留流的引用/索引呢?

例如,

假设我在聊天室有3个用户。我将rpc表示为(也可能是服务器流)

rpc Chat(stream ChatRequest) returns (stream ChatResponse) {}

例如,一个用户向该组发送一条消息,该组需要向另两个用户发送。因此,如果我需要通过当前为这些用户打开的流发送它,那么保存流的引用有多安全。

实施将像...

type chatServiceServer struct {
    // keep a map of subscribers / users currently connected; protect with mutex
}

func (s *chatServiceServer) Chat(stream pb.ChatService_ChatServer) error {
    // md, ok := metadata.FromIncomingContext(stream.Context())
    // p, ok := peer.FromContext(ctx)
    // ... identify client from above

    for {
        // save the message to DB
        // find other users in the chatroom is currently connected
        // if so, stream.Send(m)
        // else notify ....
    }
}

但是,我看到API文档中的警告,想知道更好的方法。

https://godoc.org/google.golang.org/grpc#ServerStream
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines.
SendMsg(m interface{}) error

任何订阅(事件等)都会出现类似的用例,并且需要根据客户id进行通知。任何示例代码、文章都会很好。

非常感谢。

共有1个答案

韶云瀚
2023-03-14

将流保存到其他地方使用应该是安全的,但是您只能在单个goroutine中调用SendMsg(对于RecvMsg来说,同样的限制是独立的)。所以这意味着如果您在方法处理程序中这样做:

for {
  if err := stream.Recv(req); err != nil {
    return err
  }
  for _, s := range allStreams[req.ID] {
    if err := s.Send(req.Message); err != nil { /* remove s from allStreams */ }
  }
}

然后对s.Send的调用必须由锁保护,因为这些处理程序中有多个可能同时运行。(另外,allStreams被假定为一个map[ID][]流,在这种情况下,它还必须由锁保护。)

 类似资料:
  • 全部的 我正在处理一个看似简单的案例,但它提出了一些设计挑战: 有一个带有客户端的本地参与者系统,它可以连接到运行大部分业务逻辑的远程系统。 远程系统将有一个固定的IP地址、端口等-因此,可以使用context.actorSelection(uri)策略来获取ActorRef,以获取当前参与者(或路由器后面的一组路由器)的化身。 作为服务器的远程系统不应该知道客户端的位置。 鉴于此,将消息从客户端

  • 我正在使用Datagramsocket和DatagramPacket制作一个简单的server_client应用程序。我想做的是:一个客户端将数据发送到服务器,服务器将这些数据发送到另一个客户端。问题是服务器从第一个客户端接收数据,但不将它们发送到另一个客户端,我怎么知道我将发送到的客户端的端口?端口没有改变吗? 这是客户端类: 服务器类: }

  • 问题内容: 好吧,我正在尝试使用SocketServer构建一个小型python prgram,该服务器应该将其接收的消息发送给所有连接的客户端。我被困住了,我不知道如何在服务器端存储客户端,也不知道如何发送到多个客户端。哦,我的程序每次连接1个以上的客户端时失败,并且每次客户端发送的1条以上消息都失败… 到目前为止,这是我的代码: 显然,我不知道自己在做什么,所以任何帮助都将非常有用。 提前致谢

  • 我使用的是“流式RPC”API,其中MyRequests和MyResponse都是流式传输的 下面是一个稍微简化的类,它封装了一个gRPC流; 问题 为什么需要在send方法中同步对myStream的访问?我想理解为什么我必须同步那些希望在同一个流上并行发送无序请求的线程。如果每个请求都打包在一个带有自己流id的HTTP2数据帧中,那么这只是gRPC客户端的Java实现所独有的吗 当线程从方法se

  • 我需要实现一个github授权,然后将接收到的数据(JSON)发送到客户端。 我找到了这篇教程http://shiya.io/how-to-do-3-legged-oauth-with-github-a-general-guide-by-example-with-node-js/ 在该教程中,开发人员向我们展示了如下内容:“/”->“/login”->“/redirect”->“/user”(此处

  • 在客户端,很容易为服务器添加元数据: 以上内容可以在服务器上访问,如下所示: 现在,如果我们需要将元数据从服务器发送到客户端,我们会这样做: 请注意,我们正在传递错误,实际响应为null。 如何传递错误和非空响应以及元数据? 如果我执行以下操作,它似乎不起作用,因为没有错误就无法访问客户端上的元数据。 gRPC规范是否明确禁止在没有错误时从服务器向客户端发送元数据? 此外,在我们讨论这个问题时,我