我正在尝试使用grpc实现pub-sub模式,但我对如何正确地实现它有点困惑。
我的协议:rpc调用(google.protobuf.Empty)返回(流数据)
客户:
asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
@Override
public void onNext(Data value) {
// process a data
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
} catch (StatusRuntimeException e) {
LOG.warn("RPC failed: {}", e.getStatus());
}
Thread.currentThread().join();
服务器服务:
public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
private final BlockingQueue<Data> queue;
private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();
public Sender(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// waiting for first element
Data data = queue.take();
// send head element
observers.forEach(o -> o.onNext(data));
} catch (InterruptedException e) {
LOG.error("error: ", e);
Thread.currentThread().interrupt();
}
}
}
}
如何正确地从全局观察员中删除客户端?当连接中断时,如何接收某种信号
如何管理客户端-服务器重新连接?如何在连接断开时强制客户端重新连接?
提前感谢!
在实施您的服务时:
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
您需要获取当前请求的上下文,并侦听取消。对于单请求、多响应调用(也称为服务器流),gRPC生成的代码被简化为直接传入请求。这意味着您不能直接访问底层的服务器调用。侦听器,这是您通常侦听客户端断开连接和取消连接的方式。
相反,每个gRPC调用都有一个与之关联的上下文,它携带取消和其他请求范围的信号。对于您的情况,您只需要通过添加自己的侦听器来侦听取消,然后从链接的哈希集中安全地删除响应观察器。
至于重新连接:如果连接断开,gRPC客户端将自动重新连接,但通常不会重试RPC,除非这样做是安全的。在服务器流式RPC的情况下,这样做通常不安全,因此您需要直接在客户端上重试RPC。
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 Redis 客户端可以订阅任意数量的频道。 下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2、client5 和 client1 之间的关系: 当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个
简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息
本文向大家介绍JavaScript设计模式之观察者模式(发布者-订阅者模式),包括了JavaScript设计模式之观察者模式(发布者-订阅者模式)的使用技巧和注意事项,需要的朋友参考一下 观察者模式( 又叫发布者-订阅者模式 )应该是最常用的模式之一. 在很多语言里都得到大量应用. 包括我们平时接触的dom事件. 也是js和dom之间实现的一种观察者模式. 只要订阅了div的click事件. 当点
问题内容: 我是Redis pub / sub的新手。我在系统中有一个聊天功能,就像IM。所以我想使用redis pub / sub。在我检查了样本之后,大多数样本都是基于聊天室设计的。在我的系统中,我将在多个用户之间建立多个聊天室,例如: 所以,上面的几行是房间。我已经用如下的node.js实现了服务器; 如您所见,我正在为每个连接创建一个新的Redis订户。在其他聊天室示例中,redis订户客
我需要在发布/订阅模式下调用Kafka消费者1000次。据我所知,为了让kafka在发布/订阅模式下工作,我需要给每个消费者一个新的groupId(props . put(" group . id ",String.valueOf(Instant.now())。toEpochMilli()));).但是当我这样做的时候,如果两个消费线程同时访问消费线程,就会出现问题。这个问题应该怎么解决?
本文向大家介绍JavaScript设计模式之观察者模式与发布订阅模式详解,包括了JavaScript设计模式之观察者模式与发布订阅模式详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了JavaScript设计模式之观察者模式与发布订阅模式。分享给大家供大家参考,具体如下: 学习了一段时间设计模式,当学到观察者模式和发布订阅模式的时候遇到了很大的问题,这两个模式有点类似,有点傻傻分不清楚,