当前位置: 首页 > 知识库问答 >
问题:

asyncio.wait_for超时后如何清理?

苏富
2023-03-14

我的目标是练习使用asyncio库。我已经阅读了一些入门教程,现在我想自己写一些代码。

我想开始两个简单的任务,它们基本上增加了类外存储的公共值。第一种是5秒后自动增加1。第二个任务与用户相关:如果您在这5秒内输入了一些值,那么也应该添加它。

问题是,当我没有输入任何值时,我的循环不会关闭-程序仍然处于活动状态并永远运行,直到我强制停止它-然后我得到以下错误:

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
Timeout
Loop finished. Value is 1
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.7/concurrent/futures/thread.py", line 40, in _python_exit
    t.join()
  File "/usr/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Process finished with exit code 0

基本上,在“循环完成”之后,程序就结束了,但是当控制台输入没有值时,程序就挂起了。当我输入任何v

2.py
[Auto_increment: ] This task will increment value after 5 seconds
[Manual increment: ] Waiting 5s for inc value:
5
Loop finished. Value is 6

Process finished with exit code 0

看起来当TimeoutError发生时,asyncio.wait_for后有东西没有清理。你能帮我告诉我,怎么了吗?这是我的代码:

import asyncio
import sys


class ValContainer:
    _val = 0

    @staticmethod
    def inc_val(how_many=1):
        ValContainer._val += how_many

    @staticmethod
    def get_val() -> int:
        return ValContainer._val


async def auto_increment():
    print(f'[Auto_increment: ] This task will increment value after 5 seconds')
    await asyncio.sleep(5)
    ValContainer.inc_val()
    return True


async def manual_increment(loop):
    print(f'[Manual increment: ] Waiting 5s for inc value:')
    try:
        future = loop.run_in_executor(None, sys.stdin.readline)
        line = await asyncio.wait_for(future, 5, loop=loop)
        if line:
            try:
                how_many = int(line)
                ValContainer.inc_val(how_many)
            except ValueError:
                print('That\'s not a number!')

    except asyncio.TimeoutError:
        print('Timeout')
    finally:
        return True

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    task_auto = loop.create_task(auto_increment())
    task_man = loop.create_task(manual_increment(loop))
    loop.run_until_complete(task_auto)
    loop.run_until_complete(task_man)
    print(f'Loop finished. Value is {ValContainer.get_val()}')
    loop.close()

共有1个答案

龚昊然
2023-03-14

您已经在threadpoolexecutor中启动了一个单独的线程,这些线程实际上无法取消。任务asyncio“委托”被取消,但sys被取消。斯丁。readline呼叫将无限期地挂在那里。您可以通过按enter键结束它,因为这会在sys上显示完整的一行。stdin

您必须使用一种变通方法来取消此处的读取;请注意,您不能告诉ThreadPoolExecutor使用守护进程线程。

在异步IO上下文中作为单独任务等待用户输入的情况下,创建自己的线程可能比让THreadPoolExecutor为您管理线程更容易,因此您可以在该线程上设置daemon=True,让进程在退出时杀死该线程。

 类似资料:
  • 问题内容: 有没有办法清除给定窗口中的所有超时?我认为超时存储在对象中的某个位置,但无法确认。 任何跨浏览器解决方案都是受欢迎的。 问题答案: 它们不在window对象中,但是具有id,这些id是连续的整数。 因此,您可以像这样清除所有超时:

  • 问题内容: 我正在使用共享内存的程序上工作。所述程序的多个实例将连接到现有程序或重新创建它,并在没有其他进程的情况下将其返回给OS,或者只是将其分离并终止。我想到了使用一个简单的计数器来跟踪使用它的进程数。 我正在使用函数进行清理,但是afaik在收到SIGKILL信号后,进程将不会进行任何清理,因此,如果这些进程中的任何一个不能正常终止,我可能永远无法清理内存。 有没有办法指定即使在SIGKIL

  • 我正在为如何处理这个特殊案件而挣扎。我知道我可以通过链式回调来解决这个问题,但它看起来就像是promise的海报: 我有一个父方法,它需要按顺序执行三个异步操作(特别是从用户那里得到确认)。我们称它们为func1 func2和func3。现在,我可以让每一个都返回一个promise,并将其链接起来,这一切都非常有效。我遇到的问题是: func1需要设置一个状态,等待链的其余部分运行,然后取消设置该

  • 问题内容: 我正在尝试以这种方式使用python的多处理程序包: 从池的进程中,我要避免等待等待60多个返回结果的进程。那可能吗? 问题答案: 这是一种无需更改功能即可执行此操作的方法。需要两个步骤: 使用您可以传递的选项来确保每次执行任务后重新启动池中的工作进程。 将现有的辅助函数包装在另一个函数中,该函数将调用守护程序线程,然后等待该线程的结果数秒钟。使用守护程序线程很重要,因为进程在退出之前

  • 问题内容: 我正在使用实现一个3线程池,并使用CountDownLatch监视所有线程的完成情况,以进行进一步处理。 我曾经等到所有线程完成。我希望此过程在超时(例如45秒)的情况下中止。我该如何实施? 问题答案: 使用的重载变体接受超时。

  • 我正在使用thuncydides jbehave插件来运行Selenium测试。但是,由于 jbehave 超时,我总共无法运行超过 5 分钟的测试。我不知道如何配置thuncydides/jbehave来覆盖这个限制。硒测试过去比5分钟更长,所以这对许多人来说应该是一个实际问题。