当前位置: 首页 > 知识库问答 >
问题:

RxJs:zip操作符的有损形式

拓拔弘扬
2023-03-14

考虑使用zip操作符将两个无限的可观测值压缩在一起,其中一个发射项目的频率是另一个的两倍
当前的实现是无损耗的,即如果我将这些可见光发射一个小时,然后在它们的发射率之间切换,第一个可见光将最终赶上另一个可见光
随着缓冲区越来越大,这将在某个时候导致内存爆炸<同样的情况也会发生,如果第一个可观测对象将发射项目数小时,而第二个将在最后发射一个项目。

如何实现此运算符的有损行为?我只想在任何时候排放两条流的排放物,我不在乎我错过了多少更快的流的排放物。

澄清:

  • 我在这里试图解决的主要问题是由于zip运算符的无损失特性而导致的内存爆炸。
  • 我想在我从两个流获得排放的任何时候发射,即使两个流每次发射相同的值

例子:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

常规邮政编码将产生以下输出:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]
const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>

我希望它产生的输出:

[1, 10]
[3, 20]
[5, 30]

说明:
有损压缩运算符是缓冲区大小为1的压缩运算符。这意味着它将只保留第一个发射流中的第一个项目,并将丢失所有剩余的项目(在第一个项目和第二个流中的第一个发射之间到达的项目)。因此,示例中发生的情况如下:stream1发射,lossy-zip“记住”它并忽略stream1上的所有项目,直到stream2发射。流2的首次排放量为10,因此流1松开。相互发射(有损压缩编码的第一次发射)后,它会重新开始:“记住”3、“松散”4、、发射。然后重新开始:“记住”<代码>5)、“松开”<代码>6和<代码>7,发出<代码>[5,30]<代码>。然后重新开始:“记住”<代码>40)、“松开”<代码>50、<代码>60、<代码>70,然后等待流1上的下一项。

示例2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

在这种情况下,常规的zip操作符会破坏内存<我不想这样。

摘要:
本质上我期望有损zip运算符只记住stream 1在之前的相互发射之后发出的第一个值,并在stream 2赶上stream 1时发出。并重复。


共有3个答案

单于奕
2023-03-14
匿名用户

我认为以下应该总是从每个源可观察到的最后一个值。

const source1 = Observable.interval(1000).publish();
const source2 = Observable.interval(300).publish();

source1.connect();
source2.connect();

Observable.defer(() => Observable.forkJoin(
        source1.takeUntil(source2.skipUntil(source1)),
        source2.takeUntil(source1.skipUntil(source2))
    ))
    .take(1)
    .repeat()
    .subscribe(console.log);

现场演示:http://jsbin.com/vawewew/11/edit?js安慰

此打印:

[ 0, 2 ]
[ 1, 5 ]
[ 2, 8 ]
[ 3, 12 ]
[ 4, 15 ]
[ 5, 18 ]

您可能需要将源1和源2转换为热可观察对象(如果它们还没有)。

编辑:

核心部分是source1。takeUntil(source2.skipUntil(source1))。这将从源1获取值,直到源2发出。但同时它将忽略源1,直到源2发出至少一个值:)。

forkJoin()可观察工作等待两个源完成,同时记住每个源的最后一次发射。

然后我们想重复这个过程,所以我们使用take(1)来完成这个链和。重复()。

黄锋
2023-03-14

这给出了序列[0,2][1,5][2,8][3,12]...

const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

可以说操作员更少。不过不确定它的效率如何。
CodePen

对于更新#3,

然后,您需要为每个源项提供一个键。

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

这就产生了

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen(打开控制台后刷新CodePen页面,以便更好地显示)

桂玉石
2023-03-14

以下内容将为您提供所需的行为:

Observable.zip(s1.take(1), s2.take(1)).repeat()

RxJs 5.5管道语法中:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
 类似资料:
  • buffer buffer() 操作符的函数签名: buffer([breakObservable]) buffer 本身意味着我们在等待而不会发出任何值,直到 breakObservable 发生。示例如下: let breakWhen$ = Rx.Observable.timer(1000); let stream$ = Rx.Observable.interval(200) .buffer(

  • 这可不是一个简单的话题。其中涉及了应用程序中的诸多领域,你可能想要同步 API 的响应,或者你想要处理其它类型的流,比如 UI 中的点击事件或键盘事件。 有大量的操作符以它们各自的方式来处理时间,比如 delay、 debounce、 throttle、 interval, 等等。 interval 这个操作符用来创建一个 Observable,基本上它所做的就是按固定的时间间隔提供值,函数签名如

  • max let stream$ = Rx.Observable.of(5,4,7,-1) .max(); 发出的值是7。这个操作符的功能显而易见,只提供一个最大值。还有不同的方式来调用它,可以传入一个 comparer 函数: function comparer(x,y) { if( x > y ) { return 1; } else if( x < y ) {

  • 有一些操作符允许你组合两个及以上的 source,它们的行为有所不同,重要的是要知道它们之间的区别。 combineLatest 函数签名如下: Rx.Observable.combineLatest([ source_1, ... source_n]) let source1 = Rx.Observable.interval(100) .map( val => "source1 " + val

  • 这个类别的全部是展示以某些东西为基础来创建 Observables 是多么的容易,因此他们可以和操作符配合的很好,而不在乎是怎样的构造,从而实现丰富的组合。 from 在 RxJS 4中,存在一些类似名称的操作符,例如 fromArray()、from()、fromPromise() 等等。所有这些 fromXXX 的操作符现在全由 from() 接管了。来看一些示例: 老的 fromArray

  • create 当你刚起步或者只是想要测试一些东西时,倾向于从 create() 操作符入手。它接收一个有 observer 参数的函数。在前面的一些章节中已提及过,比如 Observable 包装章节。函数签名如下: Rx.Observable.create([fn]) 示例如下: Rx.Observable.create(observer => { observer.next( 1 );