当前位置: 首页 > 知识库问答 >
问题:

为什么我的RxJava Observable只向第一个消费者发射?

连昊天
2023-03-14

有人能解释一下为什么下面的测试失败了吗?

public class ObservableTest {
    @Test
    public void badObservableUsedTwiceDoesNotEmitToSecondConsumer() {
        // Any simpler observable makes the test pass
        Observable<Integer> badObservable = Observable.just(1)
                .zipWith(Observable.just(2), (one, two) -> Observable.just(3))
                .flatMap(observable -> observable);

        ObservableCalculator calc1 = new ObservableCalculator(badObservable);
        ObservableCalculator calc2 = new ObservableCalculator(badObservable);

        // zipping causes the failure
        // Calling calculate().toBlocking().subscribe() on each calc passes
        // Observable.from(listOfCalcs).flatMap(calc -> calc.calculate()) passes
        Observable.zip(ImmutableList.of(calc1.calculate(), calc2.calculate()), results -> results)
                .toBlocking()
                .subscribe();

        assertThat(calc1.hasCalculated).isTrue();
        assertThat(calc2.hasCalculated).isTrue(); // this fails
    }

    private static class ObservableCalculator {
        private final Observable<?> observable;

        public boolean hasCalculated = false;

        public ObservableCalculator(Observable<?> observable) {
            this.observable = observable;
        }

        public Observable<Void> calculate() {
            return observable.concatMap(o -> {
                hasCalculated = true;
                // returning Observable.just(null) makes the test pass
                return Observable.empty();
            });
        }
    }
}

我试图进一步简化“不好”的观察结果,但找不到任何可以删除的东西来简化它。

然而,我目前的理解是,它是一个可观察的(不管它是如何构造的),应该发出一个值,然后完成。然后,我们基于该可观察对象制作两个类似的对象实例,并在那些使用可观察对象的对象上调用一个方法,记下已经这样做了,然后返回Observable.empty()。

有人能解释为什么使用这个可观察的会导致测试失败(当使用更简单的可观察的会导致测试通过时)吗?

也可以通过串行调用calculate()使测试通过。toBlocking()。subscribe()而不是使用zip,或使计算返回可见。只是(空)而已。这对我来说有一定的意义(如果calc1为空,zip将不会订阅calc2,因为在这种情况下zip永远不会产生任何结果),但没有完全的意义(我不明白为什么zip在badObservable的更简单版本中不会这样做——calculate()方法仍然返回空,而不管输入是什么)。

共有1个答案

步衡
2023-03-14

如果你用某个东西压缩一个空的源,操作员会检测到它不能再产生任何值,并取消订阅它的所有源。有一个混合的zip和合并涉及和合并认真对待取消订阅:它不发出值3在所有因此concatMap不调用映射函数的第二个源。

 类似资料:
  • 我刚刚开始玩弄《Spring-Cloud-Stream》中的Kafka活页夹。 我配置了一个简单的消费者: 但当我启动应用程序时,我看到在启动日志中创建了三个独立的消费者配置: 我发现这些配置之间唯一不同的是客户机。id。 除此之外,我不知道为什么只有一个消费者有三种配置。 是因为我也在运行吗? 这是我的:

  • 我对Kafka很陌生。我使用的是Kafka0.9.0.0客户端for Java。在使用特定主题的数据时,当我启动生产者-消费者java项目时,我每次都得到相同的消息(这是第一次发布的消息)。 我的要求是生成一些消息并使用它,检查两个消息是否相同。 有谁能给我指点一下吗?

  • 在Apache Kafka 0.8.2 office文档的第5.6节“分销、消费者和消费者群体”小节中,它说 组中的使用者尽可能公平地划分分区,每个分区仅由一个消费组中的一个使用者使用。 但是我发现,在实践中,一个消费者组中的多个消费者可以通过从同一主题分区发送 FetchRequest 来使用单个分区中的数据。 在接下来的消费者身份证登记处小节中 除了由一个组中的所有使用者共享的group_id

  • 我们启动一个Kafka消费者,监听一个可能还没有创建的主题(不过,主题自动创建是启用的)。 此后不久,一位制作人发表了关于这个话题的消息。 Kafka原木

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了