当前位置: 首页 > 知识库问答 >
问题:

RXJS等待数组中的所有可观察对象完成(或出错)

储俊英
2023-03-14

我把可观测的物体放入这样的阵列中。。。

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));

我想要一个可观测的,当所有任务$完成时发出。请记住,在实践中,任务$没有已知数量的可观察对象。

我试过可观察。zip(任务$)。subscribe(),但如果只有一个任务,这似乎失败了,这让我相信ZIP需要偶数个元素才能按预期的方式工作。

我已经尝试了Observable.concat(任务$)。订阅(),但是Conconat运算符的结果似乎只是一个可观察的数组...例如,基本上与输入相同。你甚至不能订阅它。

在C语言中,这类似于任务。WhenAll()。在ES6 promise中,它类似于promise。all()

我遇到了许多这样的问题,但它们似乎都涉及到等待已知数量的流(例如,将它们映射到一起)。

共有3个答案

许涵容
2023-03-14
// waits for all Observables no matter of success/fails each of them
// returns array of items
// each item represent even first value of Observable or it's error
export function waitAll(args: Observable<any>[]): Observable<any[]> {
  const final = new Subject<any[]>();
  const flags = new Array(args.length);
  const result = new Array(args.length);
  let total = args.length;
  for (let i = 0; i < args.length; i++) {
    flags[i] = false;
    args[i].subscribe(
      res => {
        console.info('waitAll ' + i + ' ok ', res);
        if (flags[i] === false) {
          flags[i] = true;
          result[i] = res;
          total--;
          if (total < 1) {
            final.next(result);
          }
        }
      },
      error => {
        console.error('waitAll ' + i + ' failed ', error);
        if (flags[i] === false) {
          flags[i] = true;
          result[i] = error;
          total--;
          if (total < 1) {
            final.next(result);
          }
        }
      }
    );
  }
  return final.asObservable();
}

单元测试

describe('waitAll', () => {
  it('should wait for all observables', async () => {
    const o1 = new Subject();
    const o2 = new Subject();
    const o3 = new Subject();

    const o = waitAll([o1, o2, o3]);
    const res = {arr: []};
    o.subscribe(result => res.arr = result, err => res.arr = []);

    expect(res.arr).toEqual([]);
    o1.next('success1');
    expect(res.arr).toEqual([]);
    o2.error('failed2')
    expect(res.arr).toEqual([]);
    o3.next('success3')
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);

    o1.next('success1*');
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    o2.error('failed2*')
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
    o3.next('success3*')
    expect(res.arr).toEqual(['success1', 'failed2', 'success3']);
  });
});
沈淇
2023-03-14

您可以使用zip

组合多个观察值以创建一个观察值,其值按顺序根据每个输入观察值的值计算。

const obsvA = this._service.getObjA();
const obsvB = this._service.getObjB();
// or with array
// const obsvArray = [obsvA, obsvB];

const zip = Observable.zip(obsvA, obsvB);
// or with array
// const zip = Observable.zip(...obsvArray);
zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
);

需要考虑的事项:

zip操作符将订阅所有内部观察值,等待每个观察值发出一个值。一旦发生这种情况,将发出具有相应索引的所有值。这将一直持续到至少一个内部可观察对象完成为止。

zip.subscribe(
  result => console.log(result), // result is an array with the responses [respA, respB]
  error => console.log(error), // will return the error message of the first observable that throws error and then finish it
  () => console.log ('completed after first error or if first observable finishes)
);
宓文斌
2023-03-14

如果您想组成一个在所有源观测值完成时发出的观测值,可以使用forkJoin

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';

var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });
 类似资料:
  • 问题内容: TL; DR 如何转换成? 我现有的代码使用Bolts构建了一系列异步任务,并等待所有这些任务完成后再执行其他步骤。本质上,它会构建a 并返回一个列表,当Bolts站点上的示例按照列表中的 所有 任务完成时,将其标记为已完成。 __ 我正在寻找替换方法,并且我假设这种方法可以构建异步任务列表(大小未知)并将它们全部包装为一个,但是我不知道如何。 我试着看,,等…但不能去工作的,我会被建

  • 有人能向我解释一下为什么运算符可以接受返回或的函数吗? 官方文件说: FlatMap运算符通过将您指定的函数应用于源可观察对象发出的每个项目来转换可观察对象,其中该函数返回本身发出项目的可观察对象。 为什么它也可以返回数组? 例如,它们都是有效的: 但这不起作用:

  • 我正在努力处理一个简单的RxJs查询,但我似乎无法理解。如果观察对象被包装在一个对象中,我似乎不知道如何合并它们。如果我直接从flatMap返回Observable,那么这个示例可以正常工作,但我还需要在输出中输入名称。我怎样才能做到这一点? 我正在使用RxJS 5.0.0-beta.2 基本数据结构: RxJs函数: 预期结果: 实际结果:

  • 用例:我正在开发一个Android应用程序,它有一个带有4个选项卡的视图页码,所有这些选项卡都是片段。对于每个选项卡/片段,我必须连接到具有 Oauth 的 REST Api,并且令牌每 5 分钟过期一次。 当前解决方案:使用RxJava和retryWhen操作符,我可以在收到401 HTTP错误时重新进行身份验证。对于订阅和消费的每个可观察流,使用: 因此,当令牌到期时,流将使用它,然后执行真正

  • 我已经实现了一个angular应用程序,它请求一个项目列表来填充一个表格。在我的服务中,我有以下函数,它从服务器请求项目列表: 编辑:表获取数据的方式: 在HTML中,我将数据源绑定到mat表

  • 问题是 我有一个活动,它定期从API获取数据并显示收到的数据。API 使用 OAuth,因此我会收到一个临时访问令牌,该令牌在一段时间(1 小时)后过期。如果应用尝试使用过期的令牌获取数据,则显然请求将失败。在我的应用的早期迭代中,我对网络请求使用 AsyncTasks,基本上只是执行了一个新的异步任务,该任务将在调用从服务器获取数据的主异步任务之前获取新的访问令牌。这工作得很好,因为主要的Asy