当前位置: 首页 > 知识库问答 >
问题:

gRPC Java客户端能否在长期存在的gRPC流上并行发送多个请求,以及如何管理N个流

公良高刚
2023-03-14

我使用的是“流式RPC”API,其中MyRequests和MyResponse都是流式传输的

service MyStreamedService {
  rpc myOperation(**stream** MyRequest) returns (**stream** MyResponse)
}

下面是一个稍微简化的类,它封装了一个gRPC流;

public class MyStreamWrapper implements StreamObserver<MyResponse> {
  public MyStreamWrapper(ManagedChannel myChannel) {
    myStub = MyStreamedServiceGrpc.newStub(myChannel);
    // create a stream and maintain a long lived reference to the stream via StreamObserver's
    myStream = myStub.myOperation(this);
  }
  
  @Override
  public void onNext(MyResponse r) {
    // handle the response (not shown)
  }
  
  @Override
  public void onError(Throwable t) {
    // very unfortunate that there is no error code in this API !
    // throttle (not shown but if I don't throttle, eats CPU)
    // Create a new stream
    myStream = myStub.myOperation(this);
  }
  
  @Override
  public void onCompleted() {
    // server has called StreamObserver<MyRequest>.onCompleted
    // Create a new stream using the async API
    myStream = myStub.myOperation(this);
  }
  
  // Context: many threada that want to send a request asynchronously
  public void send(MyRequest r) {
    synchronized(myStream) {
      myStream.onNext(r);
    }
  }
}

问题

  1. 为什么需要在send方法中同步对myStream的访问?我想理解为什么我必须同步那些希望在同一个流上并行发送无序请求的线程。如果每个请求都打包在一个带有自己流id的HTTP2数据帧中,那么这只是gRPC客户端的Java实现所独有的吗
  2. 当线程从方法send返回时,保证会发生什么
  • 请求缓冲在gRPC客户端中...我想这必须是最小值
  • 请求作为HTTP2流帧在线路上?
  • 代理收到请求了吗?
  • 服务器已收到但未处理?
  • 服务器是否收到并发送了响应?

我认为我理解的事情...

  • gRPC通道是子通道的集合,gRPC负责创建从客户端通过不同子通道到每个服务器的网络连接
  • 一个gRPC流通过一个子通道连接到每个服务器,请求按顺序处理。
  • gRPC流基于HTTP2流,它是一系列具有相同流id的帧。
  • 我也见过一个gRPC流被描述为一个单一的gRPC调用,并理解为每个请求使用一个新流是很便宜的,但是在我上面的工作示例中,它也可以是长寿的(或者至少gRPCJava客户端提供了一个让它看起来长寿的对象)
  • "gRPC的核心是流式传输。Unary(单一请求,单一响应)和服务器流式传输(单一请求)只是生产更干净的API或更优化的I/O行为的特殊情况。但是在网上,一切看起来都和流式传输一样。"埃里克·安德森在《gRPC客户端流式传输是如何实现的》中的回答帮助我理解了为什么Unary应用编程接口在服务器端包括流观察者。

共有2个答案

沈俊明
2023-03-14

事实证明,所有答案都可以在HTTP/2规范中找到https://datatracker.ietf.org/doc/html/rfc7540#这是非常可读的。

  1. Eric Anderson回答说:“请注意,对象不是线程安全的,所以如果您同时从多个线程调用对象上的方法,您应该自己进行锁定/同步。”。并没有真正解释为什么,我猜一元API不需要同步,所以它们没有同步流式API

一般评论:我发现gRPC文档很难理解,直到我意识到大多数更深层次的问题都可以通过研究HTTP2来回答,例如,HTTP2请求与proto IDL中的应用程序请求对象不符合1:1。关键点:HTTP2请求在流的持续时间内持续,可以携带许多应用程序请求对象。

左华灿
2023-03-14
  1. 首先,HTTP/2消息可能很大,所以在一般情况下,将HTTP/2帧放在导线上并不是原子的,所以多个线程可能会并发地写入同一个网络套接字。此外,还有与流控制相关的缓冲:如果流的另一边没有准备好接受更多消息,它们需要在您这边进行缓冲,所以可能会有多个线程并发地写入同一个缓冲区。
  2. 请求被缓冲在gRPC客户端是正确的答案:流观察者的方法是异步的,它们返回非常快,实际的网络通信将在某个时候由另一个线程执行。
  3. 在发送消息时,您应该使用来自CallStream观察者的方法来尊重对方的就绪状态:isReadyHandler()和setOnReadyHandler(...)(您可以始终将您的出站Stream观察者强制转换为一个CallStream观察者)。如果您忽略对方的就绪状态,gRPC将在其内部缓冲区中缓冲消息,直到对方就绪。不过,由于第1点中描述的缓冲,这可能会导致在某些情况下过度使用内存。
    BTW:您可能希望查看官方的复制与FlowControl()帮助器方法和我自己的DispatchingOnReadyHandler类。
  4. 我猜你的意图是总是打开一个RPC:如果是这样,你的代码看起来没问题。然而,问题是您是否应该使用单个bi-di调用而不是多个一元:如果服务器对1个请求消息的处理与其他请求消息的处理没有紧密联系(即:服务器不需要不断维护与所有请求消息相关的单个内存状态),那么一元调用会更好,至少有两个原因:
    4.1.不需要第1点中描述的同步。
    4.2.您将更好地利用服务器负载平衡。
    在正常情况下,启动开销新的一元RPC很小,因为它将在现有的HTTP/2连接上打开一个新的流。
    但是,如果服务器端对某些请求消息的处理可能与其他先前的请求消息有关,那么您确实需要客户端流。然而,您应该尽可能关闭并更新RPC,以允许服务器平衡流量。
  5. 如果您的客户端作为某个服务器应用程序的一部分运行,那么grpclb策略是最常见的负载平衡选择:它将维护多个HTTP/2连接到多个可用后端的集合,每个连接可能具有多个HTTP /2流(HTTP/2流与gRPC Streams对应1-1)。此外,grpclb将主动探测这些连接,以验证它们是否健康,并自动重新发出DNS查询(或任何其他名称解析服务,如果您使用自定义NameResolver),以查看是否在需要时添加了任何新的后端。如果您想要使用grpc-grpclb,请记住包含运行时依赖项。
    如果您的客户端在android上运行,那么grpclb通常不可用(大多数android设备缺乏运行后台负载均衡器的能力,即使可能,它也会很快耗尽设备的电池),但是您的连接通常会通过一些位于后端服务器前面的负载均衡器。在这种情况下,每个新的RPC通常会转到占用最少的后端。
    然而,由于您似乎只维护了一个长寿命的调用,所以您的所有请求消息都将转到同一个后端,直到调用被“续订”:这就是为什么我建议在可能的情况下使用一元调用。这比实现您自己的负载平衡要简单得多,所以如果可能的话,它应该是首选的。

关于“我理解的事情”部分的澄清:“子通道”通常基本上是HTTP/2连接:通道是可能多个HTTP/2连接(取决于客户端能力:见第5点)到多个后端(当然取决于服务器配置)的集合,每个连接可以有多个独立的流(每个gRPC调用一个HTTP/2流)。

在服务器应用到服务器应用gRPC的情况下,关于负载平衡和名称解析的一些注意事项:

  • 客户端查找可用后端的最简单也是最常见的方法是通过DNS解析。更复杂的机制包括xDS、Consor或customNameResolvers.
  • 特别是,当在同一个k8s集群中部署后端和客户端服务器应用程序时,最常见的方法是将后端部署为无头服务,这样客户端就可以通过集群内的DNS查询获得所有后端吊舱,查询内容为

 类似资料:
  • 我有这个代码: 我一直在犯这样的错误: java:不兼容的类型:com。应用句子分类请求。无法将生成器转换为com。应用句子分类请求 我已经使用Maven插件生成了gRPC Java文件。在看了多个例子后,我不确定我的问题是什么。

  • 问题内容: 首先,让我解释一下上下文: 我必须创建一个客户端,该客户端将发送许多HTTP请求以下载图像。这些请求必须是异步的,因为一旦完成图像,它将被添加到队列中,然后打印到屏幕上。由于图像可能很大且响应分块,因此我的处理程序必须将其聚合到缓冲区中。 因此,我遵循Netty示例代码(HTTP勺示例)。 目前,我有三个静态映射,用于为每个通道存储通道ID和缓冲区/块布尔值/我的最终对象。 在那之后,

  • 当我运行我的gRPC客户端,它试图将请求流式传输到服务器时,我收到了这个错误:"TypeError: has typelist_iterator,但期望其中之一:bytes, unicode" 我需要以某种方式对我发送的文本进行编码吗?错误消息有一定的意义,因为我肯定是在传入一个迭代器。我从gRPC留档中假设这是需要的。(https://grpc.io/docs/tutorials/basic/p

  • 我从使用go的GRPC开始。我阅读了官方文档,并举了几个例子。 在大多数示例中,您不识别客户端,而是使用流读取/写入数据。我看到上下文中有用于检索身份验证信息的应用编程接口,并且可以为聊天请求识别客户端。但是,如果我想根据客户端标识保留流的引用/索引呢? 例如, 假设我在聊天室有3个用户。我将rpc表示为(也可能是服务器流) 例如,一个用户向该组发送一条消息,该组需要向另两个用户发送。因此,如果我

  • 按照这里的Apache HttpAsyncClient示例,HTTPGET请求并不是一次性触发的,而是(大部分)同步触发的。 下图显示了请求的发送顺序(除了一个)。当增加请求数量时,这仍然是正确的。 我使用了另一个库(AsynHttpClient ),请求发送得更快,而且是随机的。 有什么办法可以改进这段代码,让它真正异步执行? 我添加了用于参考的代码。

  • 问题内容: 单个Servlet如何处理以用户请求形式出现的多个客户端请求?基于单例设计模式,我知道我们创建了一个servlet实例,但是单个servlet如何处理数百万个请求。对其所涉及的线程也感到困惑。 同样,这里提供了任何浏览器规范或设置,可用于跨请求发送请求或生成针对请求发送的线程。 所有框架都相同还是不同(例如,struts v / s springs)? 问题答案: Struts / S