当前位置: 首页 > 知识库问答 >
问题:

GRPC异步响应流C#

欧阳君浩
2023-03-14

如何从处理程序外部为RPC生成流响应值?(特别是,从I观察者)我目前正在执行以下操作,但这会产生跨线程问题,因为AnRx观察者在RPC处理程序之间共享...

public override Task GetTicker(RequestProto request, ServerCallContext context)
{
    var subscription = AnRxObservable.Subscribe(value =>
    {
        responseStream.WriteAsync(new ResponseProto
        {
            Value = value
        });
    });

    // Wait for the RPC to be canceled (my extension method
    // that returns a task that completes when the CancellationToken
    // is cancelled)
    await context.CancellationToken.WhenCancelled();

    // Dispose of the buffered stream
    bufferedStream.Dispose();

    // Dispose subscriber (tells rx that we aren't subscribed anymore)
    subscription.Dispose();

    return Task.FromResult(1);
}

这段代码感觉不对劲...但是我看不到从RPC处理程序之外创建的共享源流式传输RPC响应的任何其他方式。

共有1个答案

贡俊
2023-03-14

一般来说,当您试图从推模型(IObservable)转换为拉模型(枚举要写入和写入的响应)时,您需要为消息提供一个中间缓冲区,例如blockingQueue。然后,处理程序主体可以是一个异步循环,尝试获取队列的下一条消息(最好以异步方式),并将其写入responseStream。

此外,请注意,gRPC API只允许您在任何给定时间有1个飞行中响应,而您的代码片段并不尊重这一点。因此,在开始另一次写入之前,需要等待WriteAsync()(这是需要中间队列的另一个原因)。

此链接可能有助于解释推与拉的范例:何时使用IEnumerable与IObservable?

 类似资料:
  • 本教程介绍如何使用 C++ 的 gRPC 异步/非阻塞 API 去实现简单的服务器和客户端。假设你已经熟悉实现同步 gRPC 代码,如gRPC 基础: C++所描述的。本教程中的例子基本来自我们在overview中使用的Greeter 例子。你可以在 grpc/examples/cpp/helloworld找到安装指南。 概览 gRPC 的异步操作使用CompletionQueue。 基本工作流如

  • 我想返回一个临时重定向,使用AsyncACK。 下面的“工作”(因为没有错误),但似乎不是异步的(它一次处理一个请求)。 这应该工作吗?如果我明确需要像https://jersey.github.io/documentation/latest/async.html#d0e9895一样启动一个新线程,返回响应是什么样子的?

  • gRPC是否支持以下服务器和客户端之间的通信方案?1.客户端连接到服务器并调用waitMessages等方法。然后,客户端正在等待来自服务器的一些消息。2.服务器不时向客户端发送消息。例如,调用客户端的某个方法。3.客户端与服务器断开连接 我不喜欢通过轮询服务器来实现这个方案。我想调用服务器方法一次,然后等待消息。

  • 问题内容: 就像一个人在这里问到但他的解决方案是调用其他函数 …我想知道是否有可能拥有一个不调用a的函数第二个功能基于异步请求的响应,但仅当异步请求响应时。 可能是这样的: 不调用另一个函数,这有可能吗? 我要实现的目标是拥有一个可以用一些参数调用的函数,该函数将返回异步Web服务(如FB)的响应。 问题答案: 简而言之,没有。您不能让异步函数同步返回有意义的值,因为该值当时不存在(因为它是在后台

  • 我理解同步服务器和异步服务器之间的区别,但是我想知道,如果有这两种情况,哪一种更适合异步服务器还是同步服务器? > 同步:写入调用将被阻塞,直到消息准备好从内部完成队列通过线路发送。异步:写入调用立即返回,我们需要等待完成队列。在同步服务器中,如果我们添加队列,该队列基本上为evry写入调用和其他线程填充,并将其耗尽并执行stream.write然后性能将相同? 同步:gRPC内部创建线程池,线程

  • 我试图为双向流API编写一个cpp客户端。 通过下面的客户端代码,我可以在服务器上实例化一个流观察器。但是,问题在于调用服务器StreamObserver上的onNext函数。