当前位置: 首页 > 知识库问答 >
问题:

ReactiveX在超时后发出空值或哨兵值

宰父飞翼
2023-03-14

寻找一种干净的方法来将源代码可观察转换为在一段时间内不发出项后发出单个null(或哨兵值)。

例如,如果源observable发出1,2,3,然后在发出4,5,6之前停止发出10秒,我希望发出的项为1,2,3,null,4,5,6

我研究了timeout运算符,但是当超时发生时,它会终止可观察的,这是不希望的。

使用RXJava。

共有1个答案

乔伯寅
2023-03-14

根据Akarnokd的答复和一个类似问题的答复,另一种实现方式是:

如果您正在寻找一个单一的值来指示排放之间的时间间隔:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.just(-1).delay(duration, TimeUnit.MILLISECONDS, scheduler)
    .concatWith(Observable.never())
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, 4, 5, 6));

如果您希望在可观察到的源在一段时间内不发射后持续接收值:

final TestScheduler scheduler = new TestScheduler();
final TestSubject<Integer> subject = TestSubject.create(scheduler);
final TestSubscriber<Integer> subscriber = new TestSubscriber<>();

final long duration = 100;
final Observable<Integer> timeout = Observable.interval(duration, duration, TimeUnit.MILLISECONDS, scheduler)
    .map(x -> -1)
    .takeUntil(subject)
    .repeat();

subject.mergeWith(timeout).subscribe(subscriber);

subject.onNext(1,   0);
subject.onNext(2, 100);
subject.onNext(3, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
scheduler.advanceTimeBy(300, TimeUnit.MILLISECONDS);

subject.onNext(4,   0);
subject.onNext(5, 100);
subject.onNext(6, 200);

scheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

subscriber.assertNoTerminalEvent();
subscriber.assertReceivedOnNext(Arrays.asList(1, 2, 3, -1, -1, -1, 4, 5, 6));

以上所有内容都使用Java1.8.0_72在RXJava1.0.17中进行了测试。

 类似资料:
  • //在下面的程序中,在内部,当用户收到指令时,他可以简单地进入哨兵退出while循环,但是,当您得到希望购买的票的结果时,按999并不退出程序

  • 主要内容:一、哨兵,二、源码分析,三、总结一、哨兵 Sentinel(哨兵),听名字大家都应该想得到这个家伙是做什么的。在redis的应用中,有单机模式、主从模式、哨兵模式和集群模式,其实你从它的发展就可以看出来,redis是从一个简单的应用开始,不断的壮大,从单点到分布式,从简单的主从备份以及初始的哨兵监控,再到可以看成把二者合成的集群模式,除了是应用场景的变化,更多的是为了提高安全性和高可用性。网上有很多人问哨兵和集群有啥不一样,其实

  • 每一个哨兵都可以连接到我的主人,并可以看到奴隶。它们能够独立地检测主从是否倒下。问题是哨兵们无法探测到对方。 我已经验证了每个哨兵都像预期的那样向通道发布消息,但似乎没有一个哨兵真正从其他哨兵通道接收消息。 我怎么让哨兵们见面?

  • Redis 哨兵(Sentinel)是 Redis 的高可用性(Hight Availability)解决方案:由一个或多个 Sentinel 实例组成的 Sentinel 系统可以监视任意多个主服务器,以及这些主服务器的所有从服务器,并在被监视的主服务器进入下线状态时,自动将下线主服务器的某个从服务器升级为新的主服务器,然后由新的主服务器代替已下线的主服务器继续处理命令请求。 Sentinel

  • redis大师可以通过sentinel发现,使用: 现在要将数据写入主节点:

  • 但是,当我通过redis-cli-p26379连接并输入“config get maxclients”时,它告诉我没有这样的命令。但是,如果我连接到实际的Redis实例,它就可以工作。 如何获取redis sentinel的maxclients值?