我有一个生产者,它从Rest API下载页面中的数据,以及几个处理页面的消费者(例如,将它们加载到数据库中)。
我希望生产者和消费者并行工作,这意味着生产者不应该等到一个页面被消费后再下载下一个页面。每个使用者都需要按顺序处理页面。
当下载所有页面时,主线程应该等待所有消费者完成他们的工作(因为消费可能比生产需要更长的时间)。
我目前的做法如下:
我已经创建了一个下载页面的可观察对象,它在附加消费者订阅者后立即开始。我将订阅者配置为在自己的线程上观察以进行并行执行。
C# 中的代码:
IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume1(page));
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume2(page));
observable.Connect();
这种实现的问题是,主线程可能会在所有页面都处理完毕且应用程序停止之前结束。
如何使用 RX 实现此目的?
谢啦!
编辑:
还尝试了以下方法(来自答案):
static void Main(string[] args)
{
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.Select(p => Observable.Start(() => consume1(p), els1)),
ps.Select(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
}
public static void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public static void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
observable.Wait()在基础可枚举完成产生值后立即返回。输出是:
1:0
2:0
只是为了证明,如果我们用以下命令替换 getPages:
var getPages = Enumerable.Range(0, 10)
.Select(i =>
{
Console.WriteLine($"Produced {i}");
Thread.Sleep(30);
return i;
});
那么输出是:
Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9
我想这正是你想要的:
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
我写了这个测试代码:
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
public void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
我得到了这样的输出:
1:0 2:0 2:1 1:1 2:2 2:3 1:2 2:4 2:5 1:3 2:6 2:7 1:4 2:8 2:9 1:5 1:6 1:7 1:8 1:9
当您完成EventLoopScheduler
实例时,您应该调用。对它们进行Dispose()
以关闭线程。
我有以下js代码: 下面的服务器代码(最后一行调用它): 有时前端会忽略
我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于
我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1
问题内容: 我的问题: 如何在a上执行一堆线程对象并等待它们全部完成后再继续? 我是ThreadPoolExecutor的新手。因此,此代码是测试以了解其工作方式。现在我什至都不用对象填充,因为我不理解如何在不调用另一个队列的情况下开始队列。无论如何,现在我只是打电话给我,但我认为我仍然缺少一些东西。任何提示都很棒!谢谢。 RunnableObject类: 问题答案: 你应该循环
我希望能够等待一个可观察的时间,例如。 天真的尝试会导致等待立即解析,而不会阻止执行 编辑:我的全部预期用例的伪代码是: 我知道我可以将其他代码移动到另一个单独的函数中,并将其传递到subscribe回调中,但我希望能够避免这种情况。
我在spring中有一个服务,它需要使用十种不同的方法获取数据。 我希望这些方法并行执行,以执行一些DB操作并返回到父线程。但是父线程应该等到所有响应出现,然后返回响应。 在我当前的方法中,我使用反应式mono异步执行所有方法,但主线程不等待订阅者方法完成。 下面是我订阅的两种方法 下面是我的主要方法 以下是我的输出: 我的输出显示,主线程没有等待订阅服务器完成其任务,因此我如何处理这种情况?