当前位置: 首页 > 知识库问答 >
问题:

RxJS CombineTest不等待源观测值发射?

喻昀
2023-03-14

我有两个源观测值,当一个源观测值发出时,我需要计算一些数据。我试图使用combineAll()操作符,但它只在每个源观测值第一次发出时发出一个值。

是否有任何运算符类似于compineAll(),一旦任何源可观测对象第一次发出,就立即发出?如果没有,最清晰的方法是什么?

我所尝试的:

const source1$ = service.getSomeData();
const source2$ = service.getOtherData();

combineLatest(
  source1$,
  source2$
).pipe(
  map([source1Data, source2Data] => {
    // this code only gets executed when both observables emits for the first time
    return source1Data + source2Data;
  })
)

共有2个答案

张宣
2023-03-14

一种方法是在所有源前面加上startWith

combineLatest([
  source1$.pipe(startWith(?)),
  source2$.pipe(startWith(?)),
])

当任何源观测物第一次发射时,它就会发射吗?

看起来您可能正在寻找race(source1$,source2$)可观察的创建方法,或者可能只是merge(source1$,source2$)。管道(取(1))。但这取决于你想做什么。

公羊瀚
2023-03-14

如果我理解正确,您需要一个如下图所示的模式:

stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------

result$  => ------ 1 ------ 12 ------ 42 --------------

如果有一个值可用,则发出该值。如果两者都可用,则发出两者的组合,在这种情况下为简单和(12 30=42);

首先是输入流,为了这个例子,我把它们作为主题,所以我们可以手动推送数据:

const stream1$ = new Subject();
const stream2$ = new Subject();

接下来,我们将组合输入,首先通过startWith操作符进行管道传输。这确保了CombineTest生成一个可立即发射的可观测值——准确地说是[null,null]

const combined$ = combineLatest(
  stream1$.pipe(startWith(null)),
  stream2$.pipe(startWith(null)),
);

现在您有了一个observable,它总是发出长度为2的数组,包含您的数据(本例中的数字)和null的任意组合,如下图所示:

stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------

combined$                     [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------

最后,您可以检查并将该输出映射为所需格式:两个数字的总和(如果两个数字都可用),或第一个可用值:

const processedCombinations$ = combined$.pipe(
  map(([data1, data2]) => {
    if (data1 === null) return data2;
    if (data2 === null) return data1;

    return data1 + data2;
  }),
);

结果:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------

还有一个问题:从组合$发出的第一个值是[null,null],导致处理组合$最初发出null。解决此问题的一种方法是使用skipWhile将另一个管道连接到processedcompositions$

const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));

结果:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

另一种更好的方法是在创建处理组合$(现在实际上是最终$)之前过滤组合$流:

const combinedFiltered$ = combined$.pipe(
    filter(([first, second])=> first !== null || second !== null),
);

const final$ = combinedFiltered$.pipe(
    map(([data1, data2]) => {
        if (data1 === null) return data2;
        if (data2 === null) return data1;

        return data1 + data2;
    }),
);

相应的图表很好地显示了如何尽可能早地在流层次结构中消除不相关的值:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$          => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

上面的图表可以用这个代码生成:

final$.subscribe(console.log);

stream1$.next(1);
// logs: 1

stream1$.next(12);
// logs: 12

stream2$.next(30);
// logs: 42

使用的进口:

import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';

 类似资料:
  • 我有一个加载页面,我想在其中对不相关的信息执行两个网络请求(rxjava)。在这两个请求完成之前,我不想进入下一页,即使其中一个或两个都失败了。 > 使用zip将请求绑定在一起。有没有办法不必被迫使用双函数,也不必返回null? 请求A和B有一个。doOnNext和。多恩。如果其中一个返回错误,zip observable是否继续?zip订阅服务器是否也会返回错误? 这是最好的方法吗?

  • 我希望能够等待一个可观察的时间,例如。 天真的尝试会导致等待立即解析,而不会阻止执行 编辑:我的全部预期用例的伪代码是: 我知道我可以将其他代码移动到另一个单独的函数中,并将其传递到subscribe回调中,但我希望能够避免这种情况。

  • 我正在尝试将数据库调用移出控制器,以清理并使其可测试。当它们在控制器中时,一切都会顺利进行。我将它们移出控制器,并添加了一个异步,以确保我们等待。否则,我将调用的中的函数。现在,一旦我使用async/await,控制器中的函数就会认为没有用户,因为它没有等待。 有几个关于异步等待的SO问题,但我没有找到一个解决我的问题。我确实验证了返回了我的用户,并添加了控制台日志来显示路径。 节点猫鼬异步等待似

  • 问题内容: 如何更改以下代码,以触发两个异步操作并有机会同时运行? 我需要做这样的事情吗? 问题答案: TL; DR 不要在获得承诺的问题中使用模式,而是分别等待它们;而是使用(至少现在): 虽然您的解决方案 确实 并行运行这两个操作,但如果两个诺言都被拒绝,它就无法正确处理拒绝。 细节: 您的解决方案并行运行它们,但始终等待第一个完成,然后再等待第二个。 如果您只想启动它们,并行运行它们,并获得

  • 问题内容: 我如何更改以下代码,以触发两个异步操作并有机会同时运行? 我需要做这样的事情吗? 问题答案: TL; DR 不要在获得承诺的问题中使用模式,而是分别等待它们;而是使用(至少现在): 虽然您的解决方案确实并行运行这两个操作,但是如果两个诺言都被拒绝,它就无法正确处理拒绝。 细节: 您的解决方案并行运行它们,但始终等待第一个完成,然后再等待第二个。如果您只想启动它们,并行运行它们,并获得两

  • 我必须定期轮询一些RESTfulendpoint以刷新android应用程序的数据。我还必须根据连接情况暂停并恢复(如果手机处于脱机状态,甚至无需尝试)。我当前的解决方案正在运行,但它使用标准Java的ScheduledExecutorService来执行定期任务,但我想继续使用Rx范式。 这是我当前的代码,为了简洁起见,跳过了部分代码。 网络状态可观察基本上是一个包裹在可观察中的广播接收器 正如