当前位置: 首页 > 面试题库 >

使用asyncio处理超时

束研
2023-03-14
问题内容

免责声明:这是我第一次尝试该asyncio模块。

我使用asyncio.wait以下方式尝试支持超时功能,以等待一组异步任务的所有结果。这是更大的库的一部分,因此我省略了一些不相关的代码。

请注意,该库已经支持通过ThreadPoolExecutors和ProcessPoolExecutors提交任务和使用超时,因此,我对使用这些代替建议或关于为什么要使用的问题并不真正感兴趣asyncio。转到代码…

import asyncio
from contextlib import suppress

...

class AsyncIOSubmit(Node):
    def get_results(self, futures, timeout=None):
        loop = asyncio.get_event_loop()
        finished, unfinished = loop.run_until_complete(
            asyncio.wait(futures, timeout=timeout)
        )
        if timeout and unfinished:
            # Code options in question would go here...see below.
            raise asyncio.TimeoutError

起初,我不必担心在超时时取消待处理的任务,但是随后Task was destroyed but it is pending!在程序退出或时收到警告loop.close。经过研究后,我发现了多种取消任务并等待它们被实际取消的方法

选项1:

[task.cancel() for task in unfinished]
for task in unfinished:
    with suppress(asyncio.CancelledError):
        loop.run_until_complete(task)

选项2:

[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))

选项3:

# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
    await task

选项4:

像这样的答案中的某种while循环。似乎我的其他选择更好,但包括完整性。

到目前为止,选项1和2似乎都可以正常工作。每种选择都可能是“正确的”,但是随着asyncio这些年来的发展,网络上的示例和建议已经过时或相差很大。所以我的问题是

问题1

选项1和2之间有实际区别吗?我知道run_until_complete它将一直运行到将来完成为止,所以由于选项1以特定的顺序循环,所以我认为如果较早的任务需要更长的时间才能完成,它的行为可能会有所不同。我尝试查看asyncio源代码以了解其在后台asyncio.wait的任务/功能是否有效地完成了同样的事情,但这并不明显。

问题2

我认为,如果一项任务处于长时间运行的阻塞操作中间,它实际上可能不会立即取消吗?也许这仅取决于所使用的基础操作或库是否会立即引发CancelledError?为异步设计的库也许永远不会发生这种情况?

由于我试图在此处实现超时功能,因此我对此有些敏感。如果可能的话,这些事情可能需要很长时间才能取消,所以我会考虑致电cancel而不是等待它真正发生,或者设置一个很短的超时时间以等待取消完成。

问题3

是否有可能loop.run_until_complete(或者实际上是对的基础调用async.waitunfinished出于超时以外的原因返回值?如果是这样,我显然必须调整一下逻辑,但是从文档看来这是不可能的。


问题答案:

选项1和2之间有实际区别吗?

否。选项2看起来更好,效率可能略高,但是它们的最终效果是相同的。

我知道run_until_complete它将一直运行到将来完成为止,所以由于选项1以特定的顺序循环,所以我认为如果较早的任务需要更长的时间才能完成,它的行为可能会有所不同。

一开始似乎是这样,但实际上并非如此,因为loop.run_until_complete运行提交到循环的 所有
任务,而不仅仅是运行作为参数传递的任务。它仅在提供的等待完成时 停止
-这就是“运行直到完成”的意思。调用run_until_complete已调度任务的循环类似于以下异步代码:

ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
    await t

在语义上等效于以下线程代码:

ts = []
for i in range(1, 11):
    t = threading.Thread(target=time.sleep, args=(i,))
    t.start()
    ts.append(t)
# takes 10s, not 55s
for t in ts:
    t.join()

换句话说,await trun_until_complete(t)块,直到t已完成,但让一切-
如使用先前计划的任务asyncio.create_task()在这段时间以及运行。因此,总运行时间将等于最长任务的运行时间,而不是总和。例如,如果第一个任务花费很长时间,那么所有其他任务都会在此期间完成,并且等待的对象根本无法入睡。

所有这些仅适用于先前已计划的等待任务。如果您尝试将其应用于协程,它将无法正常工作:

# runs for 55s, as expected
for i in range(1, 11):
    await asyncio.sleep(i)

# also 55s - we didn't call create_task() so it's equivalent to the above
ts = [asyncio.sleep(i) for i in range(1, 11)]
for t in ts:
    await t

# also 55s
for i in range(1, 11):
   t = threading.Thread(target=time.sleep, args=(i,))
   t.start()
   t.join()

这通常是asyncio初学者的症结所在,他们编写了与最后一个asyncio示例等效的代码,并期望它可以并行运行。

我尝试查看asyncio源代码以了解其在后台asyncio.wait的任务/功能是否有效地完成了同样的事情,但这并不明显。

asyncio.wait 只是一个方便的API,可完成两件事:

  • 将输入参数转换为实现的东西Future。对于协程,这意味着它将协程提交到事件循环,就像使用一样create_task,这使它们可以独立运行。如果您像开始那样给它任务,那么将跳过此步骤。
  • 用于add_done_callback在期货交易完成时得到通知,届时它将恢复其呼叫者。

是的,它执行相同的操作,但是实现方式不同,因为它支持更多功能。

我认为,如果一项任务处于长时间运行的阻塞操作中间,它实际上可能不会立即取消吗?

在异步中,不应有“阻塞”操作,只有那些挂起的操作才应该被取消。例外是使用阻止附加到asyncio的代码run_in_executor,其中根本不会取消基础操作,但是asyncio协程将立即获取该异常。

也许这仅取决于所使用的基础操作或库是否会立即引发CancelledError?

库不 提高 CancelledError,它在取消发生之前暂停的等待点 接收 它。对于图书馆而言,取消的效果是await ...中断其等待并立即引发CancelledError。除非被捕获,否则异常将通过函数传播并一直await调用顶级协程,协程的提升CancelledError将整个任务标记为已取消。行为良好的异步代码将执行此操作,可能会finally用于释放它们持有的OS级资源。当CancelledError被捕获时,代码可以选择不重新引发它,在这种情况下,取消被有效地忽略了。

是否有可能loop.run_until_complete(或者实际上是对的基础调用async.wait)由于超时以外的原因而返回未完成的值?

如果您使用的return_when=asyncio.ALL_COMPLETE是默认设置,那是不可能的。使用很有可能return_when=FIRST_COMPLETED,然后显然有可能独立于超时。



 类似资料:
  • 问题内容: 是否可以为Alamofire请求添加超时处理程序? 在我的项目中,我以这种方式使用Alamofire: 编辑: 请求失败消息 错误域= NSURLErrorDomain代码= -1001“请求超时。” UserInfo = {NSUnderlyingError = 0x7fc10b937320 {Error Domain = kCFErrorDomainCFNetwork Code =

  • 是否可以为Alamofire请求添加超时处理程序? 在我的项目中,我使用Alamofire的方式如下: 编辑: 请求失败消息 Error Domain=NSURLErrorDomain Code=-1001“请求超时。”UserInfo={NSUnderlyingError=0x7fc10b937320{Error Domain=kCFErrorDomainCFNetwork Code=-1001

  • 问题内容: 我有以下情况: Python 3.6+ 从文件中逐行读取输入数据。 协程将数据发送到API(使用),并将调用结果保存到Mongo(使用)。因此,有很多IO正在进行。 该代码使用/编写,并且对于手动执行的单个调用也可以正常工作。 我不知道该怎么做,就是要大量使用输入数据。 我看到的所有示例都通过发送有限列表作为参数进行了演示。但是我不能简单地向它发送任务列表,因为输入文件可能有数百万行。

  • 问题内容: 我试图用一个批量一些 KTable 值,并送他们。似乎30秒钟超出了使用者超时间隔,在此间隔之后,Kafka认为该使用者已失效并释放了分区。 我尝试提高 轮询 和 提交间隔 的频率来避免这种情况: 不幸的是,这些错误仍在发生: (很多) 其次是: 显然,我需要更频繁地将心跳发送回服务器。怎么样? 我的拓扑是: 该 KTable 是关键,每30秒分组值。在 Processor.init(

  • 前言 sender的run 调用done方法,并传入TimeoutException

  • 问题内容: 我正在urllib2的urlopen中使用timeout参数。 我如何告诉Python,如果超时到期,应该引发自定义错误? 有任何想法吗? 问题答案: 在极少数情况下要使用。这样做会捕获可能很难调试的 任何 异常,并且会捕获包括和在内的异常,这些异常会使您的程序恼人。 最简单的说,您会发现: 以下内容应捕获连接超时时引发的特定错误: