我的理解是,asyncio.gather
旨在并发运行其参数,并且当协同程序执行等待表达式时,它为事件循环提供了安排其他任务的机会。考虑到这一点,我惊讶地发现下面的代码片段忽略了asyncio.gather
的一个输入。
import asyncio
async def aprint(s):
print(s)
async def forever(s):
while True:
await aprint(s)
async def main():
await asyncio.gather(forever('a'), forever('b'))
asyncio.run(main())
据我所知,会发生以下事情:
实际上,这不是我观察到的。相反,整个程序相当于而True: print('a')
。我发现非常有趣的是,即使是代码的微小变化似乎也会重新引入公平。例如,如果我们有下面的代码,那么我们在输出中得到一个大致相等的“a”和“b”的混合。
async def forever(s):
while True:
await aprint(s)
await asyncio.sleep(1.)
验证它似乎与我们花在vs中的时间没有任何关系,我发现下面的更改也提供了公平性。
async def forever(s):
while True:
await aprint(s)
await asyncio.sleep(0.)
有人知道为什么会发生这种不公平,以及如何避免这种不公平吗?我想当我有疑问的时候,我可以主动地在所有地方添加一个空的sleep语句,并希望这就足够了,但是我难以置信地不明白为什么原始代码没有按预期的那样运行。
如果这很重要,因为asyncio似乎经历了相当多的API更改,我在Ubuntu盒子上使用了Python 3.8.4的普通安装。
我想提请大家注意PEP 492,它说:
wait
,类似于从
产生,暂停执行[...]协程直到[...]等待完成并返回结果数据。
它使用来自实现的收益率,并额外验证其参数。
任何
从
调用链的输出都以输出
结束。这是实现Futures
的基本机制。因为在内部,协程是一种特殊的生成器,所以每个wait
都由wait
调用链的某个地方的产量
挂起(请参阅PEP 3156以获得详细解释)。
但是在你的情况下,异步def aprint()
不会产生,也就是说,它不会调用任何事件函数,比如I/O,或者只是等待睡眠(0)
,如果我们看它的源代码,只是是否产生代码:
@types.coroutine
def __sleep0():
"""Skip one event loop run cycle.
This is a private helper for 'asyncio.sleep()', used
when the 'delay' is set to 0. It uses a bare 'yield'
expression (which Task.__step knows how to handle)
instead of creating a Future object.
"""
yield
async def sleep(delay, result=None, *, loop=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
...
因此,由于永远而为True:
,我们可以说,你从产生了一个收益链,它不会以
收益
结束。
这部分是一个常见的误解。Python的wait
并不意味着向事件循环屈服控制,它意味着开始执行可等待的,允许它将我们一起挂起。所以是的,如果等待的对象选择挂起,当前的协程也会挂起,等待它的协程也会挂起,等等,一直到事件循环。但是如果等待的对象没有选择挂起,就像aprint
一样,等待它的协程也不会选择挂起。这偶尔是错误的来源,就像在这里或这里看到的。
有人知道为什么会发生这种不公平,以及如何避免这种不公平吗?
幸运的是,这种效果在与外界没有真正沟通的玩具示例中最为明显。虽然您可以通过将await asyncio.sleep(0)
添加到战略位置(甚至有文档记录强制进行上下文切换)来修复它们,但您可能不应该在生产代码中这样做。
一个真正的程序将依赖于外部世界的输入,无论是来自网络的数据、来自本地数据库的数据,还是来自由另一个线程或进程填充的工作队列的数据。实际数据很少会以如此快的速度到达,从而使程序的其余部分陷入饥饿状态,如果确实如此,那么饥饿很可能是暂时的,因为程序最终将由于其输出端的背压而暂停。程序从一个源接收数据的速度比处理数据的速度快,但仍然需要观察来自另一个源的数据,这是一种罕见的可能性,您可能会遇到饥饿问题,但如果出现这种情况,可以通过强制上下文切换来解决。(我没有听说有人在生产中遇到过。)
除了上面提到的错误之外,更经常发生的是协程调用CPU沉重或遗留阻塞代码,最终占用事件循环。这种情况应该通过将CPU/阻塞部分传递给run_in_executor
来处理。
问题内容: 我正在使用Retrofit为异步网络调用返回rxjava Observable。 我发现自己在重复以下调用: 似乎我一直在订阅IO线程,并在Android主线程上进行观察。这似乎是我发现的所有资源都倡导的最佳实践。也许除了长时间运行的计算外,我不太了解何时要偏离此模式。 有没有一种方法可以通过默认subscriptionOn和observeOn线程来删除此样板? 这是rxjava插件的
我正在使用改型为我的异步网络调用返回rxjava Observable。 我发现自己重复以下调用: 似乎我总是在IO线程上订阅,在Android主线程上观察。这似乎是我找到的所有资源都提倡的最佳实践。也许除了长时间运行的计算之外,我不太明白我们什么时候会想要偏离这种模式。 有没有办法通过默认subscribeOn和observeOn线程来删除这个样板文件? 这是rxjava插件的用例吗?(我找不到
我们有一个使用Spring Framework在Tomcat中运行的Web应用程序。我们需要为循环操作添加一些计划作业。为此,我们遇到了Quartz Scheduler,并遵循了使用Quartz with Spring配置作业的教程,并按预期计划并运行了作业。 所以我们有一些任务是在应用程序启动时安排的。现在我们希望用户手动运行作业并更改作业的触发器,但是我们需要将这些更改持久化到数据库中。因此,
问题内容: 在Akka文档中指出,如果未配置调度程序,则将使用默认调度程序。默认调度程序的属性是什么,即并行度最小值,并行度因子,并行度最大值等? 问题答案: 默认情况下,提供的调度程序是带有的调度程序,并且默认的并行度值为: 最小并行度: 8 并行因子: 3.0 最大并行度: 64 您可以在文档中看到所有这些信息。 有一个节名为: 参考配置清单 这是配置文件的相关部分(我只删除了注释):
问题内容: 我想自己开发一个探查器,我想解释一下我所看到的。即使在最简单的程序中,也总是会出现一些默认线程: 销毁JavaVM 信号调度器 终结器 参考处理程序 尽管他们的名字很能说明问题,但我想获得更多信息。似乎这些线程没有记录在案,是否有人知道挖掘这些信息的来源,甚至确切地知道这些线程的作用? 问题答案: DestroyJavaVM是一个线程,该线程在程序退出时卸载Java VM。在大多数情况
问题内容: 我正在使用t和注释执行一些任务。 如何确定spring-boot中预设任务的默认池大小是多少? 原因:以下类不是并行执行作业,而是一个接一个地执行。也许默认情况下仅配置了一个线程执行程序? 结果:在第一个作业完成后执行第二个作业。 问题答案: 是的,默认情况下,所有方法共享一个线程。通过定义如下这样可以覆盖此行为: 本示例确保所有方法共享大小为100的线程池。