当前位置: 首页 > 面试题库 >

如何在可观察的流中处理前n个项目并以不同的方式处理其余项

樊胜
2023-03-14
问题内容

例如,

给定一定数量(m)的数字流(m1,m2,m3,m4,m5,m6 …),然后对前n个项应用变换(2 * i)(n可以小于,等于或大于m),对其余项应用另一个变换(3 * i)。和

返回结果:m1 * 2,m2 * 2,m3 * 3,m4 * 3,m5 * 3,m6 * 3 …(这里假设n = 2)。

我试图使用take(n)和skip(n),然后使用concatwith,但是take(n)看起来将删除序列中的其余项,然后使skip(n)不返回任何内容。


问题答案:

你可以分享你M的流,然后合并到一起take()skip()流,是这样的:

    int m = 10;
    int n = 8;
    Observable<Integer> numbersStream = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .publish();

    Observable<Integer> firstNItemsStream = numbersStream.take(n)
            .map(i -> i * 2);

    Observable<Integer> remainingItemsStream = numbersStream.skip(n)
            .map(i -> i * 3);

    Observable.merge(firstNItemsStream, remainingItemsStream)
            .subscribe(integer -> System.out.println("result = " + integer));
    numbersStream.connect();

编辑:
@AE
Daphne指出,share()它将与第一个订阅者一起开始发射,因此,如果Observable已开始发射项目,则第二个订阅者可能会错过通知,因此在这种情况下,还有其他可能性:
cache() -将答复所有缓存发出的项目并将其回复给每个新订户,但会牺牲取消订阅的能力,因此需要谨慎使用。
reply().refCount()
-将创建Observable的是reply()以前所有项目的每个新用户(类似缓存),但会取消时,从它的最后一个订户退订。


在这两种情况下,都应考虑内存,因为Observable它将在内存中缓存所有发出的项目。

publish() -在不缓存所有先前项目的情况下,另一种可能性是使用publish()create
ConnectableObservable,并connect()在所有必需的订阅者都订阅之后调用它的方法来开始发射,这样将获得同步,并且所有订阅者将正确获得所有通知。



 类似资料:
  • 我正在尝试实现一个Spring批处理作业,为了处理记录,它需要2-3个db调用,这会减慢记录的处理速度(大小为100万)。如果我使用基于块的处理,它会单独处理每条记录,性能会很慢。因此,我需要一次性处理1000条记录,作为批量处理,这将减少数据库调用,并提高性能。但我的问题是,如果我实现Tasklet,那么我也会失去可重启性和重试/跳过功能,如果使用AggregateInputReader实现,我

  • 我在谷歌上搜索了一下,并试图用我找到的例子尽可能好地解决这个问题,但是唉...没有成功。 使命: 打开模式,并显示用于选择已存在用户的复选框 错误: 问题: 在选择一个人之前,新的或现有的玩家是“未定义的”。因此,我创建了一个“teamPlayerDefault”js对象,其数据类似于api调用返回的数据(尚未实现)。这是用来初始化的 JSFiddle链接: 点击这里。。。 代码:

  • 我们使用Spring Batch进行一些处理,通过Reader读取一些ID,我们希望通过处理器将它们处理为“块”,然后写入多个文件。但是处理器接口一次只允许处理一个项目,我们需要进行批量处理,因为处理器依赖于第三方,不能为每个项目调用服务。 我看到我们可以为“块”中涉及的所有读取器-处理器-写入器创建包装器,以处理列表<>并委托给一些具体的读取器/处理器/写入器。但这对我来说并不是件好事。像这样:

  • 我正在使用java-8进行多个Spring boot项目,但现在其中一个需要转移到java-11。所以我需要在笔记本电脑上安装两个版本。那么如何在同一台机器上为不同的项目处理不同的JAVA版本呢?

  • 问题内容: 考虑以下示例: 这将输出从1到5的数字,然后打印异常。 我要实现的是使观察者保持订阅状态,并在引发异常后继续运行,即打印从1到10的所有数字。 我曾尝试使用和其他各种错误处理运算符,但是,正如文档中所述,它们的目的是处理可观察对象自身发出的错误。 最直接的解决方案是将整个过程包装到try- catch块中,但这对我来说似乎不是一个好的解决方案。在类似的Rx.NET问题中,提出的解决方案

  • 我试图理解方法是如何精确地处理并行流的,我不理解为什么下面的代码不返回这些字符串的串联。 代码如下: 该代码仅适用于顺序流,但对于并行流,它不会返回串联。每次输出都不同。有人能解释一下那里发生了什么事吗?