当前位置: 首页 > 知识库问答 >
问题:

RxPy:如何从外部回调创建热可观察并订阅多个异步进程?

有骏奇
2023-03-14

我有一个外部服务(ExternalDummyService),在其中注册回调。我想从该回调创建一个可观察的对象,并订阅多个异步进程。

pyfiddle中的完整代码:https://pyfiddle.io/fiddle/da1e1d53-2e34-4742-a0b9-07838f2c13df*请注意,在pyfiddle版本中,“睡眠”被替换为“范围(10000)内的i: foo=i”,因为睡眠无法正常工作。

主要代码如下:

thread = ExternalDummyService()
external_obs = thread.subject.publish()

external_obs.subscribe(slow_process)
external_obs.subscribe(fast_process)
external_obs.connect()

thread.start()

class ExternalDummyService(Thread):
    def __init__(self):
        self.subject = Subject()

    def run(self):
        for i in range(5):
            dummy_msg = { ... }
            self.subject.on_next(dummy_msg)

def fast_process(msg):
    print("FAST {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
    sleep(0.1)

def slow_process(msg):
    print("SLOW {0} {1}".format(msg["counter"], 1000*(time() - msg["timestamp"])))
    sleep(1)

我得到的输出是这个,两个进程同步运行,在两个进程完成每次执行之前,ExternalDummyService不会发出新值:

emitting 0
STARTED
SLOW 0 1.0008811950683594
FAST 0 2.0122528076171875
emitting 1
SLOW 1 1.5070438385009766
FAST 1 1.5070438385009766
emitting 2
SLOW 2 0.5052089691162109
FAST 2 0.9891986846923828
emitting 3
SLOW 3 1.0006427764892578
FAST 3 1.0006427764892578
emitting 4
SLOW 4 1.0013580322265625
FAST 4 1.0013580322265625

FINISHED

我希望得到这样的结果,即服务在不等待进程运行和进程异步运行的情况下发出消息:

STARTED
emitting 0
emitting 1
emitting 2
FAST 0 2.0122528076171875
FAST 1 1.5070438385009766
emitting 3
SLOW 0 1.0008811950683594
FAST 2 0.9891986846923828
emitting 4
FAST 3 1.0006427764892578
SLOW 1 1.5070438385009766
FAST 4 1.0013580322265625
SLOW 2 0.5052089691162109
SLOW 3 1.0006427764892578
SLOW 4 1.0013580322265625

FINISHED

我尝试过share()、ThreadPoolScheduler和其他我不知道我在做什么的东西。

谢啦!

共有1个答案

邵弘致
2023-03-14

使用这个问题的答案:具有多个订阅者和事件的RxJava并发

... 我已使用此代码达到预期效果:

optimal_thread_count = cpu_count()
pool_scheduler = ThreadPoolScheduler(optimal_thread_count)

thread = ExternalDummyService()
external_obs = thread.subject.publish()

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
    .subscribe(fast_process)

external_obs \
    .flat_map(lambda msg: Observable.just(msg).subscribe_on(pool_scheduler)) \
    .subscribe(slow_process)

external_obs.connect()

thread.start()

完整版本:https://pyfiddle.io/fiddle/20f8871c-48d6-4d6b-b1a4-fdd0a4aa6f95/?m=Saved小提琴

输出为:

emitting 0
emitting 1
emitting 2
emitting 3
emitting 4
FAST 0 52.629709243774414
FAST 1 51.12814903259277
FAST 2 100.2051830291748
FAST 3 151.2434482574463
SLOW 0 503.0245780944824
SLOW 1 502.0263195037842
FAST 4 548.7725734710693
SLOW 2 551.4400005340576
SLOW 3 652.1098613739014
SLOW 4 1000.3445148468018

请随时提出改进建议。

 类似资料:
  • 我不知道如何从Observable中提取值,由Observable所在的函数返回。我只需要从中返回一个值,其他什么都不需要。 最新版本有效吗 我需要它工作,函数返回值,然后: 我做错了什么?

  • 我必须从两个订阅服务器获取数据,但我总是获取第一个订阅服务器的数据。 我有一个数据共享服务: 在离开搜索组件之前,我调用update方法。 现在,我在results组件上。我得到的共享数据如下: 我的问题是:我需要共享数据来订阅另一个可观察的数据。首先,我构造了一个物体乘坐,在我调用搜索方法之后 问题是我总是从数据服务获取数据,而不是从api调用。api工作导致我在存储中拦截结果,而不是在组件中。

  • 本文向大家介绍system.reactive 订阅/取消订阅可观察对象(IDisposable),包括了system.reactive 订阅/取消订阅可观察对象(IDisposable)的使用技巧和注意事项,需要的朋友参考一下 示例 订阅返回IDisposable: 当您准备取消订阅时,只需处置订阅即可:            

  • 我想调用一个函数(同步),然后使用它的返回值作为初始发射(随后链接一些其他运算符上的结果可观察)。 我想在订阅期间调用这个函数,所以我不能只使用

  • 我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1

  • 我是这样理解的,从可观察的角度来看: > 有人订阅了我,我应该开始发送项目 [订阅者:1][要发送的项目:1,2,3] 向订阅服务器发送项“1” [订阅服务器:1][要发送的项:2,3] ... 但它不是这样运作的。就像它们是两个独立的可观测物在一个。这让我很困惑,为什么他们不把项目给所有的订户? 奖金: 谢了!