当前位置: 首页 > 知识库问答 >
问题:

asyncio的默认调度程序何时公平?

荆城
2023-03-14

我的理解是,asyncio.gather旨在并发运行其参数,并且当协同程序执行等待表达式时,它为事件循环提供了安排其他任务的机会。考虑到这一点,我惊讶地发现下面的代码片段忽略了asyncio.gather的一个输入。

import asyncio                                                             
  
async def aprint(s):
    print(s)

async def forever(s):
    while True:
        await aprint(s)

async def main():
    await asyncio.gather(forever('a'), forever('b'))

asyncio.run(main())

据我所知,会发生以下事情:

  1. run(main())对事件循环执行任何必要的全局初始化,并安排main()执行。

实际上,这不是我观察到的。相反,整个程序相当于而True: print('a')。我发现非常有趣的是,即使是代码的微小变化似乎也会重新引入公平。例如,如果我们有下面的代码,那么我们在输出中得到一个大致相等的“a”和“b”的混合。

async def forever(s):
    while True:
        await aprint(s)
        await asyncio.sleep(1.)

验证它似乎与我们花在vs中的时间没有任何关系,我发现下面的更改也提供了公平性。

async def forever(s):
    while True:
        await aprint(s)
        await asyncio.sleep(0.)

有人知道为什么会发生这种不公平,以及如何避免这种不公平吗?我想当我有疑问的时候,我可以主动地在所有地方添加一个空的sleep语句,并希望这就足够了,但是我难以置信地不明白为什么原始代码没有按预期的那样运行。

如果这很重要,因为asyncio似乎经历了相当多的API更改,我在Ubuntu盒子上使用了Python 3.8.4的普通安装。

共有2个答案

羊舌志
2023-03-14

我想提请大家注意PEP 492,它说:

wait,类似于产生,暂停执行[...]协程直到[...]等待完成并返回结果数据。

它使用来自实现的收益率,并额外验证其参数。

任何调用链的输出都以输出结束。这是实现Futures的基本机制。因为在内部,协程是一种特殊的生成器,所以每个wait都由wait调用链的某个地方的产量挂起(请参阅PEP 3156以获得详细解释)。

但是在你的情况下,异步def aprint()不会产生,也就是说,它不会调用任何事件函数,比如I/O,或者只是等待睡眠(0),如果我们看它的源代码,只是是否产生代码

@types.coroutine
def __sleep0():
    """Skip one event loop run cycle.

    This is a private helper for 'asyncio.sleep()', used
    when the 'delay' is set to 0.  It uses a bare 'yield'
    expression (which Task.__step knows how to handle)
    instead of creating a Future object.
    """
    yield


async def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay <= 0:
        await __sleep0()
        return result
...

因此,由于永远而为True:,我们可以说,你从产生了一个收益链,它不会以收益结束。

方昊
2023-03-14

这部分是一个常见的误解。Python的wait并不意味着向事件循环屈服控制,它意味着开始执行可等待的,允许它将我们一起挂起。所以是的,如果等待的对象选择挂起,当前的协程也会挂起,等待它的协程也会挂起,等等,一直到事件循环。但是如果等待的对象没有选择挂起,就像aprint一样,等待它的协程也不会选择挂起。这偶尔是错误的来源,就像在这里或这里看到的。

有人知道为什么会发生这种不公平,以及如何避免这种不公平吗?

幸运的是,这种效果在与外界没有真正沟通的玩具示例中最为明显。虽然您可以通过将await asyncio.sleep(0)添加到战略位置(甚至有文档记录强制进行上下文切换)来修复它们,但您可能不应该在生产代码中这样做。

一个真正的程序将依赖于外部世界的输入,无论是来自网络的数据、来自本地数据库的数据,还是来自由另一个线程或进程填充的工作队列的数据。实际数据很少会以如此快的速度到达,从而使程序的其余部分陷入饥饿状态,如果确实如此,那么饥饿很可能是暂时的,因为程序最终将由于其输出端的背压而暂停。程序从一个源接收数据的速度比处理数据的速度快,但仍然需要观察来自另一个源的数据,这是一种罕见的可能性,您可能会遇到饥饿问题,但如果出现这种情况,可以通过强制上下文切换来解决。(我没有听说有人在生产中遇到过。)

除了上面提到的错误之外,更经常发生的是协程调用CPU沉重或遗留阻塞代码,最终占用事件循环。这种情况应该通过将CPU/阻塞部分传递给run_in_executor来处理。

 类似资料:
  • 问题内容: 我正在使用Retrofit为异步网络调用返回rxjava Observable。 我发现自己在重复以下调用: 似乎我一直在订阅IO线程,并在Android主线程上进行观察。这似乎是我发现的所有资源都倡导的最佳实践。也许除了长时间运行的计算外,我不太了解何时要偏离此模式。 有没有一种方法可以通过默认subscriptionOn和observeOn线程来删除此样板? 这是rxjava插件的

  • 我正在使用改型为我的异步网络调用返回rxjava Observable。 我发现自己重复以下调用: 似乎我总是在IO线程上订阅,在Android主线程上观察。这似乎是我找到的所有资源都提倡的最佳实践。也许除了长时间运行的计算之外,我不太明白我们什么时候会想要偏离这种模式。 有没有办法通过默认subscribeOn和observeOn线程来删除这个样板文件? 这是rxjava插件的用例吗?(我找不到

  • 我们有一个使用Spring Framework在Tomcat中运行的Web应用程序。我们需要为循环操作添加一些计划作业。为此,我们遇到了Quartz Scheduler,并遵循了使用Quartz with Spring配置作业的教程,并按预期计划并运行了作业。 所以我们有一些任务是在应用程序启动时安排的。现在我们希望用户手动运行作业并更改作业的触发器,但是我们需要将这些更改持久化到数据库中。因此,

  • 问题内容: 在Akka文档中指出,如果未配置调度程序,则将使用默认调度程序。默认调度程序的属性是什么,即并行度最小值,并行度因子,并行度最大值等? 问题答案: 默认情况下,提供的调度程序是带有的调度程序,并且默认的并行度值为: 最小并行度: 8 并行因子: 3.0 最大并行度: 64 您可以在文档中看到所有这些信息。 有一个节名为: 参考配置清单 这是配置文件的相关部分(我只删除了注释):

  • 问题内容: 我想自己开发一个探查器,我想解释一下我所看到的。即使在最简单的程序中,也总是会出现一些默认线程: 销毁JavaVM 信号调度器 终结器 参考处理程序 尽管他们的名字很能说明问题,但我想获得更多信息。似乎这些线程没有记录在案,是否有人知道挖掘这些信息的来源,甚至确切地知道这些线程的作用? 问题答案: DestroyJavaVM是一个线程,该线程在程序退出时卸载Java VM。在大多数情况

  • 问题内容: 我正在使用t和注释执行一些任务。 如何确定spring-boot中预设任务的默认池大小是多少? 原因:以下类不是并行执行作业,而是一个接一个地执行。也许默认情况下仅配置了一个线程执行程序? 结果:在第一个作业完成后执行第二个作业。 问题答案: 是的,默认情况下,所有方法共享一个线程。通过定义如下这样可以覆盖此行为: 本示例确保所有方法共享大小为100的线程池。