当前位置: 首页 > 知识库问答 >
问题:

RX:如何等待订阅者完成?

逄边浩
2023-03-14

我有一个生产者,它从Rest API下载页面中的数据,以及几个处理页面的消费者(例如,将它们加载到数据库中)。

我希望生产者和消费者并行工作,这意味着生产者不应该等到一个页面被消费后再下载下一个页面。每个使用者都需要按顺序处理页面。

当下载所有页面时,主线程应该等待所有消费者完成他们的工作(因为消费可能比生产需要更长的时间)。

我目前的做法如下:

我已经创建了一个下载页面的可观察对象,它在附加消费者订阅者后立即开始。我将订阅者配置为在自己的线程上观察以进行并行执行。

C# 中的代码:

IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume1(page));

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume2(page));

observable.Connect();

这种实现的问题是,主线程可能会在所有页面都处理完毕且应用程序停止之前结束。

如何使用 RX 实现此目的?

谢啦!

编辑:

还尝试了以下方法(来自答案):

static void Main(string[] args)
{
    var getPages = Enumerable.Range(0, 10);

    var els1 = new EventLoopScheduler();
    var els2 = new EventLoopScheduler();

    var observable =
        getPages
            .ToObservable()
            .Publish(ps =>
                Observable
                    .Merge(
                        ps.Select(p => Observable.Start(() => consume1(p), els1)),
                        ps.Select(p => Observable.Start(() => consume2(p), els2))));

    observable.Wait();
}

public static void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public static void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

observable.Wait()在基础可枚举完成产生值后立即返回。输出是:

1:0
2:0

只是为了证明,如果我们用以下命令替换 getPages:

var getPages = Enumerable.Range(0, 10)
    .Select(i =>
    {
        Console.WriteLine($"Produced {i}");
        Thread.Sleep(30);
        return i;
    });

那么输出是:

Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9

共有1个答案

贺兴平
2023-03-14

我想这正是你想要的:

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

我写了这个测试代码:

var getPages = Enumerable.Range(0, 10);

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

observable.Wait();

public void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

我得到了这样的输出:

1:0
2:0
2:1
1:1
2:2
2:3
1:2
2:4
2:5
1:3
2:6
2:7
1:4
2:8
2:9
1:5
1:6
1:7
1:8
1:9

当您完成EventLoopScheduler实例时,您应该调用。对它们进行Dispose()以关闭线程。

 类似资料:
  • 我有以下js代码: 下面的服务器代码(最后一行调用它): 有时前端会忽略

  • 我通读了RxJS文档,并希望确保我理解了< code > subscriber . unsubscribe()和< code > subscriber . complete()之间的区别。 假设我有一个有两个订阅者的可观察对象,订阅者1和订阅者2。如果订阅者1对其订阅调用取消订阅,它将不再接收来自可观察对象的通知,但订阅者2将继续接收它们。 <代码>的文档。complete(): 观察者回调,用于

  • 我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1

  • 问题内容: 我的问题: 如何在a上执行一堆线程对象并等待它们全部完成后再继续? 我是ThreadPoolExecutor的新手。因此,此代码是测试以了解其工作方式。现在我什至都不用对象填充,因为我不理解如何在不调用另一个队列的情况下开始队列。无论如何,现在我只是打电话给我,但我认为我仍然缺少一些东西。任何提示都很棒!谢谢。 RunnableObject类: 问题答案: 你应该循环

  • 我希望能够等待一个可观察的时间,例如。 天真的尝试会导致等待立即解析,而不会阻止执行 编辑:我的全部预期用例的伪代码是: 我知道我可以将其他代码移动到另一个单独的函数中,并将其传递到subscribe回调中,但我希望能够避免这种情况。

  • 我在spring中有一个服务,它需要使用十种不同的方法获取数据。 我希望这些方法并行执行,以执行一些DB操作并返回到父线程。但是父线程应该等到所有响应出现,然后返回响应。 在我当前的方法中,我使用反应式mono异步执行所有方法,但主线程不等待订阅者方法完成。 下面是我订阅的两种方法 下面是我的主要方法 以下是我的输出: 我的输出显示,主线程没有等待订阅服务器完成其任务,因此我如何处理这种情况?