当前位置: 首页 > 知识库问答 >
问题:

如何在grpc中正确设计发布订阅模式?

申屠弘图
2023-03-14

我正在尝试使用grpc实现pub-sub模式,但我对如何正确地实现它有点困惑。

我的协议:rpc调用(google.protobuf.Empty)返回(流数据)

客户:

asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
         @Override
         public void onNext(Data value) {
           // process a data

         @Override
         public void onError(Throwable t) {

         }

         @Override
         public void onCompleted() {

         }
       });

   } catch (StatusRuntimeException e) {
     LOG.warn("RPC failed: {}", e.getStatus());
   }

   Thread.currentThread().join();

服务器服务:

public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
  private final BlockingQueue<Data> queue;
  private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();

  public Sender(BlockingQueue<Data> queue) {
    this.queue = queue;
  }

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

  @Override
  public void run() {
    while (!Thread.currentThread().isInterrupted()) {
      try {
        // waiting for first element
        Data data = queue.take();
        // send head element
        observers.forEach(o -> o.onNext(data));

      } catch (InterruptedException e) {
        LOG.error("error: ", e);
        Thread.currentThread().interrupt();
      }
    }
  }
}

如何正确地从全局观察员中删除客户端?当连接中断时,如何接收某种信号
如何管理客户端-服务器重新连接?如何在连接断开时强制客户端重新连接?

提前感谢!

共有1个答案

叶举
2023-03-14

在实施您的服务时:

  @Override
  public void data(Empty request, StreamObserver<Data> responseObserver) {
    observers.add(responseObserver);
  }

您需要获取当前请求的上下文,并侦听取消。对于单请求、多响应调用(也称为服务器流),gRPC生成的代码被简化为直接传入请求。这意味着您不能直接访问底层的服务器调用。侦听器,这是您通常侦听客户端断开连接和取消连接的方式。

相反,每个gRPC调用都有一个与之关联的上下文,它携带取消和其他请求范围的信号。对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后从链接的哈希集中安全地删除响应观察器。

至于重新连接:如果连接断开,gRPC客户端将自动重新连接,但通常不会重试RPC,除非这样做是安全的。在服务器流式RPC的情况下,这样做通常不安全,因此您需要直接在客户端上重试RPC。

 类似资料:
  • Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 Redis 客户端可以订阅任意数量的频道。 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2、client5 和 client1 之间的关系: 当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个

  • 简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息

  • 本文向大家介绍JavaScript设计模式之观察者模式(发布者-订阅者模式),包括了JavaScript设计模式之观察者模式(发布者-订阅者模式)的使用技巧和注意事项,需要的朋友参考一下 观察者模式( 又叫发布者-订阅者模式 )应该是最常用的模式之一. 在很多语言里都得到大量应用. 包括我们平时接触的dom事件. 也是js和dom之间实现的一种观察者模式. 只要订阅了div的click事件. 当点

  • 问题内容: 我是Redis pub / sub的新手。我在系统中有一个聊天功能,就像IM。所以我想使用redis pub / sub。在我检查了样本之后,大多数样本都是基于聊天室设计的。在我的系统中,我将在多个用户之间建立多个聊天室,例如: 所以,上面的几行是房间。我已经用如下的node.js实现了服务器; 如您所见,我正在为每个连接创建一个新的Redis订户。在其他聊天室示例中,redis订户客

  • 我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?

  • 本文向大家介绍JavaScript设计模式之观察者模式与发布订阅模式详解,包括了JavaScript设计模式之观察者模式与发布订阅模式详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JavaScript设计模式之观察者模式与发布订阅模式。分享给大家供大家参考,具体如下: 学习了一段时间设计模式,当学到观察者模式和发布订阅模式的时候遇到了很大的问题,这两个模式有点类似,有点傻傻分不清楚,