当前位置: 首页 > 面试题库 >

正常关闭异步协程

刁丰羽
2023-03-14
问题内容

我目前在关闭应用程序的CTRL-C期间关闭异步协程时遇到问题。下面的代码是我现在所拥有的简化版本:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import asyncio
import time
import functools
import signal


class DummyProtocol(asyncio.Protocol):

    def __init__(self, *args, **kwargs):
        self._shutdown = asyncio.Event()
        self._response = asyncio.Queue(maxsize=1)
        super().__init__(*args, **kwargs)

    def connection_made(self, transport):
        self.transport = transport

    def close(self):
        print("Closing protocol")
        self._shutdown.set()

    def data_received(self, data):

        #data = b'OK MPD '

        # Start listening for commands after a successful handshake
        if data.startswith(b'OK MPD '):
            print("Ready for sending commands")
            self._proxy_task = asyncio.ensure_future(self._send_commands())
            return

        # saving response for later consumption in self._send_commands
        self._response.put_nowait(data)

    async def _send_commands(self):

        while not self._shutdown.is_set():

            print("Waiting for commands coming in ...")

            command = None

            # listen for commands coming in from the global command queue. Only blocking 1sec.
            try:
                command = await asyncio.wait_for(cmd_queue.get(), timeout=1)
            except asyncio.TimeoutError:
                continue

            # sending the command over the pipe
            self.transport.write(command)

            # waiting for the response. Blocking until response is complete.
            res = await self._response.get()
            # put it into the global response queue
            res_queue.put_nowait(res)


async def connect(loop):
    c = lambda: DummyProtocol()
    t = asyncio.Task(loop.create_connection(c, '192.168.1.143', '6600'))
    try:
        # Wait for 3 seconds, then raise TimeoutError
        trans, proto = await asyncio.wait_for(t, timeout=3)
        print("Connected to <192.168.1.143:6600>.")
        return proto
    except (asyncio.TimeoutError, OSError) as e:
        print("Could not connect to <192.168.1.143:6600>. Trying again ...")
        if isinstance(e, OSError):
            log.exception(e)


def shutdown(proto, loop):
    # http://stackoverflow.com/a/30766124/1230358
    print("Shutdown of DummyProtocol initialized ...")
    proto.close()
    # give the coros time to finish
    time.sleep(2)

    # cancel all other tasks
    # for task in asyncio.Task.all_tasks():
    #    task.cancel()

    # stopping the event loop
    if loop:
        print("Stopping event loop ...")
        loop.stop()

    print("Shutdown complete ...")


if __name__ == "__main__":

    loop = asyncio.get_event_loop()

    cmd_queue = asyncio.Queue()
    res_queue = asyncio.Queue()

    dummy_proto = loop.run_until_complete(connect(loop))

    for signame in ('SIGINT','SIGTERM'):
        loop.add_signal_handler(getattr(signal, signame), functools.partial(shutdown, dummy_proto, loop))

    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.close()

如果按CTRL-C,会给我以下输出:

Connected to <192.168.1.143:6600>.
Ready for sending commands
Waiting for commands coming in ...
Waiting for commands coming in ...
Waiting for commands coming in ...
Waiting for commands coming in ...
^CShutdown of DummyProtocol initialized ...
Closing protocol
Stopping event loop ...
Shutdown complete ...
Task was destroyed but it is pending!
task: <Task pending coro=<DummyProtocol._send_commands() running at ./dummy.py:45> wait_for=<Future pending cb=[Task._wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<Queue.get() running at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py:168> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:344]>
Exception ignored in: <generator object Queue.get at 0x10594b468>
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/queues.py", line 170, in get
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 227, in cancel
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 447, in call_soon
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 456, in _call_soon
  File "/usr/local/Cellar/python3/3.5.1/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py", line 284, in _check_closed
RuntimeError: Event loop is closed

我对asyncio的经验不是很丰富,所以我很确定我在这里缺少重要的东西。真正让我头疼的是输出之后的部分Shutdown complete ...。从开始Task was destroyed but it is pending!,我必须承认我不知道发生了什么。我查看了其他问题,但无法正常工作。那么,为什么这段代码输出类似的东西Task was destroyed but it is pending! aso.?如何干净地关闭协程呢?

谢谢你的帮助!


问题答案:

什么Task was destroyed but it is pending!意思

如果目前您的程序已完成一些异步任务,但仍未完成,则会收到此警告。之所以需要此警告,是因为某些正在运行的任务可能无法正确释放某些资源。

有两种常见的解决方法:

  1. 您可以等待任务自己完成
  2. 您可以取消任务,等待任务完成

异步和阻止同步操作

让我们来看一下代码:

def shutdown(proto, loop):
    print("Shutdown of DummyProtocol initialized ...")
    proto.close()

    time.sleep(2)
    # ...

time.sleep(2)-这行不会给协程足够的时间。它将冻结所有程序两秒钟。在这段时间内什么都不会发生。

发生这种情况是因为事件循环在您调用的同一进程中运行time.sleep(2)。您永远不要在异步程序中以这种方式调用长时间运行的同步操作。请阅读此答案以查看异步代码如何工作。

我们如何等待任务完成

让我们尝试修改shutdown功能。这不是异步函数,您不能await在其中添加任何内容。要执行一些异步代码,我们需要手动执行:停止当前正在运行的循环(因为它已经在运行),创建一些异步函数来等待任务完成,并传递此函数以在事件循环中执行。

def shutdown(proto, loop):
    print("Shutdown of DummyProtocol initialized ...")

    # Set shutdown event: 
    proto.close()

    # Stop loop:
    loop.stop()

    # Find all running tasks:
    pending = asyncio.Task.all_tasks()

    # Run loop until tasks done:
    loop.run_until_complete(asyncio.gather(*pending))

    print("Shutdown complete ...")

您也可以取消任务并等待它们完成。有关详细信息,请参见此答案。

清理作业地点

我对信号并不陌生,但是您真的需要它来捕获CTRL-
C吗?无论何时KeyboardInterrupt发生的事件,都会在运行事件循环的地方逐行抛出(在代码中为loop.run_forever())。我在这里可能是错的,但是处理这种情况的常用方法是将所有清理操作都置于finally阻塞状态。

例如,您可以看到aiohttp它是如何进行的:

try:
    loop.run_forever()
except KeyboardInterrupt:  # pragma: no branch
    pass
finally:
    srv.close()
    loop.run_until_complete(srv.wait_closed())
    loop.run_until_complete(app.shutdown())
    loop.run_until_complete(handler.finish_connections(shutdown_timeout))
    loop.run_until_complete(app.cleanup())
loop.close()


 类似资料:
  • 在从Jetty服务器下载文件的客户端Ant任务中,我偶尔会收到 我在谷歌上搜索了一段时间,但到目前为止,我还没有一个结论性的答案来解释为什么会发生这种情况。 谁能解释一下这个例外的根本原因是什么? 我的Jetty日志似乎没有这个异常的等效痕迹。然而,Jetty服务器似乎确实正在终止安全连接。 作为一个背景——当蚂蚁任务产生的两个客户端使用相同的证书从码头服务器下载预定的文件时,我看到了这个异常。我

  • 我正在使用Spring任务执行框架执行一个任务。为此,我用@Async注释注释了我的方法,并将以下内容添加到基于XML的应用程序上下文中: 在本例中,我们想知道这个执行器的shutdown方法是如何被调用的?我想确保我的应用不会永远等待这个线程池。 我可以(而不是使用任务名称空间)将我的执行器定义为bean,然后将其destroy方法设置为“shutdown”,但不知道任务名称空间定义样式。 有什

  • 问题内容: 这是我目前拥有的代码: 一切都会按预期进行(调用writeToFile方法时将文件写入)。但是,当第二次调用writeToFile方法时,出现以下错误: 该文件仍按预期第二次写入,但是它将在第二次以及以后对writeToFile()的调用中引发此错误。我想知道是什么导致此错误发生。 问题答案: 写完后就在打电话。流关闭后,将无法再次写入。通常,实现此目标的方法是将结束状态移至write

  • 问题内容: 在我的模型中,具有获取数据的功能,该数据需要完成处理程序作为参数: 它正在调用另一个函数,该函数执行联系人的异步加载,我将完成情况转发到该函数 具有完成的调用如下所示: 有时这是可行的,但是执行的顺序常常不是我期望的那样。问题是,有时下的范围之前执行下结束了。 这是为什么?如何确保在之后开始执行? 问题答案: 一些观察: 它总是执行2之前1处的值。获得描述的行为的唯一方法是,如果要在f

  • 要求我们推荐或查找工具、库或最喜欢的场外资源的问题与Stack Overflow无关,因为它们往往会吸引固执己见的答案和垃圾邮件。相反,描述问题以及迄今为止为解决它所做的工作。 我正在寻找类似以下代码的东西。我知道下面的代码显然是错误的,我只是想明确我在寻找什么。 **必须在后台工作并使用回调函数实现 **它必须能够在单个活动中多次调用。AsyncTask不是我想要的东西。 **我不介意开源库。只

  • 问题内容: 在我的webapp中,我创建了一个使用固定大小的ThreadPool的服务。我在整个应用程序生命周期中都重复使用了相同的代码。 所有人都在Tomcat中运行,这在关闭时给我以下错误: 我确实意识到我需要在关闭tomcat之前先关闭ExecutorService。Soms SO线程已经在谈论这一点,但是我找不到一种干净的方法来解决这个问题。 是否应该在正常关闭线程和执行器的情况下使用建议