当前位置: 首页 > 面试题库 >

使用asyncio创建两个并发异步任务

子车雅珺
2023-03-14
问题内容

我需要创建一个可以同时从Web套接字和管道接收消息的软件,并在另一个通道上发送消息(它从套接字接收消息,创建一个新线程并发送到管道。以与从管道接收消息相同的方式,创建一个新线程并发送到套接字。

我在使用多线程时遇到问题,在程序启动时,我必须启动方法socket_receiverpipe_receiver但只能启动pipe_receiver。我尝试删除所有代码并仅保留socket_receiverpipe_receiver但仅输入while Truepipe_receiver

import asyncio
import sys
import json
from concurrent.futures.thread import ThreadPoolExecutor
import websockets

# make the Pool of workers
executor = ThreadPoolExecutor(max_workers=10)
# Make connection to socket and pipe
header = {"Authorization": r"Basic XXXX="}
connection = websockets.connect('wss://XXXXXXXX', extra_headers=header)


async def socket_receiver():
    """Listening from web socket"""
    async with connection as web_socket:
        while True:
            message = await web_socket.recv()
            # send the message to the pipe in a new thread
            executor.submit(send_to_pipe(message))


async def pipe_receiver():
    """Listening from pipe"""
    while True:
        message = sys.stdin.readline()
        if not message:
            break
        executor.submit(send_to_socket(message))
        # jsonValue = json.dump(str(line), file);
        sys.stdout.flush()


def send_to_pipe(message):
    # Check if message is CAM or DENM
    json_message = json.loads(message)
    type = int(json_message["header"]["messageID"])
    # 1 is DENM message, 2 is CAM message
    if type == 1  or type == 2:
        # send the message to the pipe
        sys.stdout.print(json_message);


async def send_to_socket(message):
     async with connection as web_socket:
        json_message = json.dumps(message)
        await web_socket.send(json_message)


asyncio.get_event_loop().run_until_complete(
    asyncio.gather(socket_receiver(),pipe_receiver()))

该程序由子进程调用,父进程通过连接到stdout和stdin的管道与之通信。

更新:我收到@Martijn Pieters代码的异常

Traceback (most recent call last):
  File "X", line 121, in <module>
    main()
  File "X", line 119, in main
    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
  File "X\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 568, in run_until_complete
    return future.result()
  File "X", line 92, in connect_pipe
    reader, writer = await stdio()
  File "X", line 53, in stdio
    lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
  File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 1421, in connect_read_pipe
    transport = self._make_read_pipe_transport(pipe, protocol, waiter)
  File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 433, in _make_read_pipe_transport
    raise NotImplementedError
NotImplementedError

问题答案:

您使用的方式不ThreadPoolExecutor正确,您真的不想在这里使用它。相反,您需要设置使用者和生产者以使用队列来处理套接字和管道以在它们之间发送消息。

  • 对于每种连接类型,创建一个协程以创建连接,然后将该单个连接传递给该连接的使用者和生产者 任务 (使用创建asyncio.create_task())。用于使用asyncio.wait()来运行两个任务return_when=asyncio.FIRST_COMPLETED,因此当两个任务之一完成“提前”(例如失败)时,您可以取消任何仍在运行的任务。

  • 使用队列将消息从一个使用方传递到另一个连接的生成方。

  • sys.stdinsys.stdout阻塞 流,不只是读,写他们!请参阅https://gist.github.com/nathan-hoad/8966377,以了解尝试设置非阻塞STDIO流的要点,以及此异步问题,要求提供非阻塞流功能。

  • 不要使用全局套接字连接,当然不要使用两个单独的async with语句。您的send_to_socket()方法实际上将 关闭 套接字,因为在async with connection as web_socket:发送第一条消息时上下文管理器会退出,这会导致socket_receiver代码问题(假定套接字无限期保持打开状态)。

  • 不要在这里使用线程!您的连接完全由asyncio进行管理,因此,线程连接会因此而脚。

  • asyncio.Executor()实例只能与常规可调用对象一起使用, 而不能 与协程一起使用。Executor.submit()声明它需要一个可调用的,在协程中传递executor.submit(send_to_pipe(message))executor.submit(send_to_socket(message))会导致异常,因为协程不是可调用的。您可能没有看到异常消息,因为该异常在另一个线程中引发。

这就是socket_receiver()协程失败的原因。它当然 开始,
但是尝试发送消息失败。当我在本地模拟的websocket服务器上运行您的代码时,将显示警告:

    RuntimeWarning: coroutine 'send_to_socket' was never awaited
  executor.submit(send_to_socket(message))

当不等待协程时,该协程中的代码将永远不会执行。将协程包装成一个输出stderr(try: callable(), except Exception: traceback.print_exc(file=sys.stderr)))异常的包装:

    Traceback (most recent call last):
  File "soq52219672.py", line 15, in log_exception
    callable()
TypeError: 'coroutine' object is not callable

执行程序应仅用于集成无法转换为使用协程的代码。执行者管理该代码使其与asyncio任务并行运行而不会受到干扰。如果该代码想与asyncio任务交互,始终使用asyncio.run_coroutine_threadsafe()asyncio.call_soon_threadsafe()跨越边界调用,则应格外小心。请参见
并发和多线程 部分

这是一个示例,该示例stdio()基于基于Nathan Hoad主题的主题,以及在支持对stdio作为管道的支持有限的Windows的基础上,如何重写您的代码以使用使用者/生产者模式:

import asyncio
import json
import os
import sys

import websockets

async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)

async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        jsonmessage = json.dumps(message)
        await socket.send(jsonmessage)

async def connect_socket(incoming, outgoing):
    header = {"Authorization": r"Basic XXXX="}
    uri = 'wss://XXXXXXXX'
    async with websockets.connect(uri, extra_headers=header) as websocket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.create_task(socket_consumer(websocket, outgoing))
        producer_task = asyncio.create_task(socket_producer(websocket, incoming))

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        for task in pending:
            task.cancel()
        # force a result check; if there was an exception it'll be re-raised
        for task in done:
            task.result()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_running_loop()

    if sys.platform == 'win32':
        # no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832
        # use an executor to read from stdio and write to stdout
        class Win32StdinReader:
            def __init__(self):
                self.stdin = sys.stdin.buffer 
            async def readline():
                # a single call to sys.stdin.readline() is thread-safe
                return await loop.run_in_executor(None, self.stdin.readline)

        class Win32StdoutWriter:
            def __init__(self):
                self.buffer = []
                self.stdout = sys.stdout.buffer
            def write(self, data):
                self.buffer.append(data)
            async def drain(self):
                data, self.buffer = self.buffer, []
                # a single call to sys.stdout.writelines() is thread-safe
                return await loop.run_in_executor(None, sys.stdout.writelines, data)

        return Win32StdinReader(), Win32StdoutWriter()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader),
        sys.stdin
    )

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin,
        os.fdopen(sys.stdout.fileno(), 'wb')
    )
    writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, None, loop)

    return reader, writer

async def pipe_consumer(pipereader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipereader.readline()
        if not message:
            break
        await outgoing.put(message.decode('utf8'))

async def pipe_producer(pipewriter, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        jsonmessage = await incoming.get()
        message = json.loads(jsonmessage)
        type = int(message.get('header', {}).get('messageID', -1))
        # 1 is DENM message, 2 is CAM message
        if type in {1, 2}:
            pipewriter.write(jsonmessage.encode('utf8') + b'\n')
            await pipewriter.drain()

async def connect_pipe(incoming, outgoing):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.create_task(pipe_consumer(reader, outgoing))
    producer_task = asyncio.create_task(pipe_producer(writer, incoming))

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED
    )
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()

async def main():
    pipe_to_socket = asyncio.Queue()
    socket_to_pipe = asyncio.Queue()

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket)

    await asyncio.gather(socket_coro, pipe_coro)

if __name__ == '__main__':
    asyncio.run(main())

然后从两个任务开始,一个任务管理套接字,另一个任务管理STDIO管道。两家公司都分别为其消费者和生产者启动了2个任务。有两个队列将消息从一个的消费者发送到另一个的生产者。



 类似资料:
  • 问题内容: 我正在尝试使用Python 3的相对较新的模块正确理解和实现两个同时运行的对象。 简而言之,asyncio似乎旨在处理事件循环中的异步过程和并发执行。它促进了(应用于异步函数中)作为无回调方式等待和使用结果的使用,而不会阻塞事件循环。(期货和回调仍然是可行的选择。) 它还提供了该类,该类 是用于包装协程的专门子类。最好通过使用该方法来调用。异步任务的预期用途是允许独立运行的任务与同一事

  • 我试图正确理解和实现两个并发运行的任务对象使用Python 3相对较新的异步模块。 简而言之,asyncio似乎被设计用于处理异步进程和事件循环上的并发执行。它提倡使用(应用于异步函数)作为无回调的方式来等待和使用结果,而不阻塞事件循环。(期货和回调仍然是可行的选择。) 它还提供了

  • 8.2 使用异步任务 注意:本节所介绍的功能要求 vim 编译包括 +job 特性。 8.2.1 简单任务体验 前文说到,Vim 的异步任务主要是针对外部命令的。那我们就先以最简单最常见的系统命 令 ls 为例,其功能是列出当前目录下的文件,若在 Windows 操作系统下或可用 dir 命令代替。 首先请在 shell 中进入一个非空目录,便于实践,并在 shell 中执行如下命令: $ ls

  • 假设有一个进行各种数据库查询的库: 我想同时执行这两个查询,而不必添加到方法签名或添加装饰器。这些功能不应该完全依赖于异步。 使用中的非异步函数的最佳方法是什么? 我在找这样的东西: 提前感谢您的考虑和回复。

  • 这可能是一个更巧妙的问题,但我在ViewComponent类中有以下方法 所以我的问题是我应该采取什么方法?让异步在那里与警告无关,还是有一个解决方案/修复这个警告?它对我的项目有那么大的影响吗? 谢了!

  • 问题内容: 假设有一个可以进行各种数据库查询的库: 我想同时执行这两个查询,而不必添加到方法签名或添加装饰器。这些功能完全不应该依赖于异步。 在其中利用这些非异步功能的最佳方法是什么? 我正在寻找某种形式的东西: 预先感谢您的考虑和回应。 问题答案: 如果某个函数在本质上是阻塞的而不是异步的,则在事件循环内运行该函数的唯一正确方法是使用run_in_executor在线程内运行该函数: 它确实起作