当前位置: 首页 > 知识库问答 >
问题:

RxJava—一个生产者,多个并发消费者,在单个订阅中

解浩渺
2023-03-14

我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。

根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。

因此,像这样的设置

Observable.range(1, 1000) // Whatever has to be processed
            .observeOn(Schedulers.computation())
            .doOnNext(/* heavy computation */)
            .doOnCompleted(() -> System.out.println("COMPLETED"))
            .forEach(System.out::println);

尽管使用了线程池,但只会收到对doOnNext的并发调用。对睡眠和检查操作观察On.java的实验似乎证实了这一点,因为每个观察调用都会获得一个工作器。此外,如果不是这样,应该会有一个复杂的OnComplated管理,必须等待任何挂起的OnNext完成,我发现不存在。

假设我在这里走对了(也就是说,只涉及一个线程,尽管你可以用observeOn跳过其中几个线程),那么正确的模式是什么?我可以找到相反情况的示例(将多个异步事件生成器同步到一个使用者中),但这不是一个简单的示例。

我想flatMap也参与其中,可能是使用了限制并发订阅数量的beta版(1.x版)。也许可以像这样简单地使用窗口/平面图?

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(/* Processing */, 4) // For 4-concurrent processing
.subscribe()

在这种方法中,我仍然缺少一种以Rx通用方式最大化CPU的简单方法(即,指定计算调度器,而不是flatMap的最大订阅数)。所以,也许……:

Observable
.range(1, 1000) // Whatever has to be processed
.window(1) // Emit one observable per item, for example 
.flatMap(v -> Observable.just(v)
                        .observeOn(Schedulers.computation())
                        .map(/* heavy parallel computation */))
.subscribe()

最后,在一些使用flatMap的示例中,我看到在flatMap之后有一个调用,我不知道为什么需要这个调用,因为flatMap不应该为下游执行序列化吗?(例如,在本例中:http://akarnokd.blogspot.com.es/2016/02/flatmap-part-1.html)

共有1个答案

宋劲
2023-03-14

Thomas Nield有篇很好的文章就是关于那个案子的

RxJava-最大化并行化

我个人在这种情况下所做的,我只是在具有最大并发调用参数的平面地图中订阅Schedulers.io

    Observable.range(1, 1000) // Whatever has to be processed
            .flatMap(v -> Observable.fromCallable(() -> { /* io calls */}) 
                    .subscribeOn(Schedulers.io()), Runtime.getRuntime().availableProcessors() + 1)
            .subscribe();

根据评论中的建议进行编辑。最好使用调度程序。计算()用于CPU限制的工作

    Observable.range(1, 1000) // Whatever has to be processed
            .flatMap(v -> Observable.fromCallable(() -> { /* intense calculation */}) 
                    .subscribeOn(Schedulers.computation()))
            .subscribe();
 类似资料:
  • 我有一个使用ActiveMQ的消息队列。web请求用persistency=true将消息放入队列。现在,我有两个消费者,它们都作为单独的会话连接到这个队列。使用者1总是确认消息,但使用者2从不这样做。 JMS队列实现负载平衡器语义。一条消息将被一个使用者接收。如果在发送消息时没有可用的使用者,它将被保留,直到有可以处理消息的使用者可用为止。如果使用者接收到一条消息,但在关闭之前没有确认它,那么该

  • 我有三根线。线程1(T1)是生成器,它生成数据。线程2和线程3(T2和T3)分别等待T1的数据在单独的循环中处理。我正在考虑在线程之间共享BlockingQueue,并通过调用“Take”让T2和T3等待。

  • 我有一个生产者/消费者场景,我不希望一个生产者交付产品,多个消费者消费这些产品。然而,常见的情况是,交付的产品只被一个消费者消费,而其他消费者从未看到过这个特定的产品。我不想实现的是,一个产品被每个消费者消费一次,而没有任何形式的阻碍。 我的第一个想法是使用多个BlockingQueue,每个消费者使用一个,并使生产者将每个产品按顺序放入所有可用的BlockingQueues中。但是,如果其中一个

  • 问题内容: 因此,我已经看到了许多在Go中实现一个消费者和许多生产者的方法-Go 并发中的经典fanIn函数。 我想要的是fanOut功能。它以一个通道作为参数,它从中读取一个值,并返回一个通道片,该通道将这个值的副本写入其中。 有没有正确/推荐的方法来实现这一目标? 问题答案: 您几乎描述了执行此操作的最佳方法,但这是执行此操作的一小段代码示例。 去游乐场:https : //play.gola

  • 问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在

  • 我正在寻找一种将多个订阅者附加到RxJava可观察流的方法,每个订阅者异步处理发出的事件。 我第一次尝试使用。flatMap(),但这似乎对任何后续订阅服务器都不起作用。所有订阅服务器都在同一线程上处理事件。 最终工作的是通过每次创建一个新的可观察的来消耗新线程中的每个事件: 输出: 以及多个订阅者的最终结果: 输出: 然而,这似乎有点笨拙。有没有更优雅的解决方案,或者RxJava不是一个很好的用