当前位置: 首页 > 知识库问答 >
问题:

RXJava-组合最新而不丢失任何结果

金皓君
2023-03-14

我想把两个观测值结合起来,一个发射n个项目,另一个只发射1个。

组合测试(CombineTest)将等待,直到两个可观察对象都至少发出一个项目,然后组合最新发出的项目,直到两个可观察对象都完成。按以下时间顺序考虑:

  • 可观测A-

组合测试(CombineTest)只会将可观察到的1的结果2与可观察到的2的结果1进行组合(可以在此处轻松测试:http://rxmarbles.com/#combineLatest).

我需要什么

我需要合并两个可观测的所有项目,无论哪一个更快。我该怎么做?

结果应该是(总是,独立的观察开始发射项目第一!):

  • A1与B1组合
  • A2结合B1

共有2个答案

魏宸
2023-03-14

注意:这是未经测试的:

从可观察序列a创建一个Replaysubject。对于序列b上发出的每个值,将该值与重放主题组合以创建新的可观察的Pair

public static <A, B> Observable<Pair<A, B>> permutation(
    Observable<A> observableA, 
    Observable<B> observableB, 
) {
    ReplaySubject<A> subjectA = ReplaySubject.create();
    observableA.subscribe(subjectA::onNext);
    return observableB.flatMap(b -> subjectA.map(a -> Pair.of(a, b)));
}

柴星津
2023-03-14

老问题,但我遇到了同样的问题。这是我的想法。首先,非工作版本:

    Observable<Integer> emitsMany = Observable.range( 1, 10 )
            .concatMap( i -> Observable.just( i ).delay( 1, TimeUnit.SECONDS ))
            .doOnNext( i -> System.out.println( "produced " + i ));

    Observable<Boolean> emitsOne = Observable.just( true )
            .delay( 3, TimeUnit.SECONDS )
            .doOnNext( b -> System.out.println( "produced " + b ));

    Observable.combineLatest(
            emitsMany, emitsOne,
            ( i, b ) -> "consumed " + i + " " + b )
    .blockingSubscribe( System.out::println );

果然,许多排放物的前几次排放量都会下降:

produced 1
produced 2
produced 3
produced true
consumed 3 true
produced 4
consumed 4 true
. . .

我想这是解决办法。。首先,我们需要将emitsOne打包成一个可以毫不延迟地继续返回先前观察到的值的东西。我不知道有哪个操作符能做到这一点,但行为主体(BehaviorSubject)能做到这一点。

接下来,我们可以将concatMap与嵌套的take(1)Observable一起使用:

    Observable<Integer> emitsMany = Observable.range( 1, 10 )
            .concatMap( i -> Observable.just( i ).delay( 1, TimeUnit.SECONDS ))
            .doOnNext( i -> System.out.println( "produced " + i ));

    Observable<Boolean> emitsOne = Observable.just( true )
            .delay( 3, TimeUnit.SECONDS )
            .doOnNext( b -> System.out.println( "produced " + b ));

    BehaviorSubject<Boolean> emitsOneSubject = BehaviorSubject.create();
    emitsOne.subscribe( emitsOneSubject::onNext );

    emitsMany.concatMap( i -> emitsOneSubject
            .take( 1 )
            .map( b -> "consumed " + i + " " + b ))
    .blockingSubscribe( System.out::println );

我们现在得到了所有的组合:

produced 1
produced 2
produced true
consumed 1 true
consumed 2 true
produced 3
consumed 3 true
produced 4
consumed 4 true
produced 5
consumed 5 true
produced 6
consumed 6 true
produced 7
consumed 7 true
produced 8
consumed 8 true
produced 9
consumed 9 true
produced 10
consumed 10 true
 类似资料:
  • 问题内容: 我想使用JavaScript强制文本框的值小写。我已经尝试过下面的代码,但是每次您按一个键,光标就会跳到输入的末尾。如何避免这种情况? 问题答案: $(“#beLowerCase”).on(‘input’, function(){ 这实际上也适用于CSS: 服务器可以照顾实际的下壳体…

  • 我们正在BigQuery表上进行流式插入。 我们希望在不更改表名的情况下更新表的模式。 例如,我们希望删除一列,因为它包含敏感数据,但是我们希望保持所有其他数据和表名不变。 我们的流程如下: 将原始表复制到临时表 删除原始表 使用原始表名和新架构创建新表。 用旧表的数据填充新表。 哭,因为最后(最多)90分钟的数据卡在流缓冲区中,未传输 如何避免最后一步?

  • 问题内容: 我正在使用MySQL,但是我认为这是一个基本的SQL问题。 除了举个例子,我不知道还有什么要问的。 假设我的表格中有以下数据: 如何获得单个Foreign_key的每个密钥的最新信息? 例如,假设我想要4的Foreign_key最新,那么我想要的结果将是: 我将使用什么SQL来达到这个结果? 顺便说一句,我意识到这并不是大多数人选择存储数据的第一种方式,但是我有自己的理由。 即,这些值

  • 我试图以CSV格式保存py spark . SQL . data frame . data frame(也可以是其他格式,只要它易于阅读)。 到目前为止,我找到了几个示例来保存DataFrame。然而,每次我编写它时,它都会丢失信息。 数据集示例: 为了将这个文件保存为CSV,我首先尝试了这个解决方案: 不幸是,这导致了以下错误: 这就是我尝试另一种可能性的原因,将spark数据帧转换成panda

  • 我试图在使用RESTendpoint的骆驼路由中构建一个分割/聚合模式。它需要一个包含请求详细信息列表的请求对象。我想并行处理请求详细信息,然后将聚合结果返回给调用方。我希望这是一个同步调用。 这是我的路线中的代码。 我希望调用的结果是聚合调用(我的响应对象)的输出。但我实际上得到的是REST调用返回的请求对象?? 当我放入更多的日志语句时,我可以看到Split调用正在触发多个线程,这很好。我可以

  • 根据Hazelcast官方文档,从版本3.8开始支持滚动升级。 如果我的服务器版本是3.5,是否有办法创建一个成功的集群,其中新的盒子运行较新版本的Hazelcast? 天真地升级到3.6。*导致了两个不同的集群(旧的盒子仍然运行3.5,而另一个新的盒子运行3.6,显然没有数据,因为它永远无法与现有的盒子接触)。 我的部署过程如下: 创建一组新框 我的想法是在磁盘/数据库上存储快照,并在推出时从数