当前位置: 首页 > 文档资料 > 学习 RxJS 操作符 >

mergeMap

优质
小牛编辑
141浏览
2023-12-01

mergeMap

函数签名: mergeMap(project: function: Observable, resultSelector: function: any, concurrent: number): Observable

映射成 observable 并发出值。


flatMap 是 mergeMap 的别名!

如果同一时间应该只有一个内部 subscription 是有效的,请尝试 switchMap

如果内部 observables 发送和订阅的顺序很重要,请尝试 concatMap


为什么使用 mergeMap

当想要打平内部 observable 并手动控制内部订阅数量时,此操作符是最适合的。

例如,当使用 switchMap 时,源 observable 发出值时,每个内部订阅都是完成的,只允许存在一个活动的内部订阅。与此相反,mergeMap 允许同一时间存在多个活动的内部订阅。正因为如此,mergeMap 最常见的用例便是不会被取消的请求,可以将其考虑成写,而不是读。注意如果需要考虑顺序的话, concatMap 会是更好的选择。

注意,因为 mergeMap 同时维护多个活动的内部订阅,由于这些长期活动的内部订阅,所以是有可能产生内存泄露的。举个例子,如果你将 observable 映射成内部的定时器或 DOM 事件流。在这些案例中,如果你仍然想用 mergeMap 的话,你应该利用另一个操作符来管理内部订阅的完成,比如 taketakeUntil。你还可以使用 concurrent 参数来限制活动的内部订阅的数量,参见 示例 4

示例

示例 1: 使用 observable 进行 mergeMap

( StackBlitz | jsBin | jsFiddle )

// RxJS v6+
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

// 发出 'Hello'
const source = of('Hello');
// 映射成 observable 并将其打平
const example = source.pipe(mergeMap(val => of(`${val} World!`)));
// 输出: 'Hello World!'
const subscribe = example.subscribe(val => console.log(val));
示例 2: 使用 promise 进行 mergeMap

( StackBlitz | jsBin | jsFiddle )

// RxJS v6+
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

// 发出 'Hello'
const source = of('Hello');
// mergeMap 还会发出 promise 的结果
const myPromise = val =>
  new Promise(resolve => resolve(`${val} World From Promise!`));
// 映射成 promise 并发出结果
const example = source.pipe(mergeMap(val => myPromise(val)));
// 输出: 'Hello World From Promise'
const subscribe = example.subscribe(val => console.log(val));
示例 3: 使用 resultSelector 函数进行 mergeMap

( StackBlitz | jsBin | jsFiddle )

// RxJS v6+
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

/*
  你还可以提供第二个参数,它接收源 observable 的值并发出内部 observable 或 promise 的值
*/
// 发出 'Hello'
const source = of('Hello');
// mergeMap 还会发出 promise 的结果
const myPromise = val =>
  new Promise(resolve => resolve(`${val} World From Promise!`));
const example = source.pipe(
  mergeMap(
    val => myPromise(val),
    (valueFromSource, valueFromPromise) => {
      return `Source: ${valueFromSource}, Promise: ${valueFromPromise}`;
    }
  )
);
// 输出: "Source: Hello, Promise: Hello World From Promise!"
const subscribe = example.subscribe(val => console.log(val));
示例 4: 使用 concurrent 值进行 mergeMap

( StackBlitz | jsBin | jsFiddle )

// RxJS v6+
import { interval } from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';

// 每1秒发出值
const source = interval(1000);

const example = source.pipe(
  mergeMap(
    //project
    val => interval(5000).pipe(take(2)),
    //resultSelector
    (oVal, iVal, oIndex, iIndex) => [oIndex, oVal, iIndex, iVal],
    //concurrent
    2
  )
);
/*
        输出:
        [0, 0, 0, 0] <--第一个内部 observable
        [1, 1, 0, 0] <--第二个内部 observable
        [0, 0, 1, 1] <--第一个内部 observable
        [1, 1, 1, 1] <--第二个内部 observable
        [2, 2, 0, 0] <--第三个内部 observable
        [3, 3, 0, 0] <--第四个内部 observable
*/
const subscribe = example.subscribe(val => console.log(val));

相关食谱

其他资源


源码: https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/mergeMap.ts