当前位置: 首页 > 知识库问答 >
问题:

关于python的asyncio的timeout?

汝楷
2024-07-08

需求:
一个asyncio.Queue()作为中间交换。一个线程往里put, 一个线程从里get. 使用future总是取不得值.

示例demo:

import asyncio
import threading, time

audioQueue = asyncio.Queue()

def job(task,loop):
    def _asyncJob(task, loop):
        result = 'hello'
        time.sleep(1)
        asyncio.run_coroutine_threadsafe(task(result), loop)
        loop.call_soon_threadsafe(lambda: print('size:%s' % audioQueue.qsize()))

    threading.Thread(name=f"transcribe_thread_1", target=_asyncJob, args=(task, loop)).start()

def sync_function(loop):
    # 这是一个同步函数
    task = lambda result : audioQueue.put(result)
    job(task, loop)

def getEle(loop):
    
    try:
        future = asyncio.run_coroutine_threadsafe(audioQueue.get(), loop)
        chnStatement = future.result(0.1)
    except asyncio.TimeoutError as exout:
        chnStatement = 'timeout'
    except asyncio.CancelledError as excel:
        chnStatement = 'cancel'
    except Exception as exc:
        chnStatement = 'exception'
    
    print(f'get element {chnStatement}')

async def main():
    # get the current event loop
    loop = asyncio.get_running_loop()
    sync_function(loop)
    threading.Thread(name=f"transcribe_thread_2", target=getEle, args=(loop,)).start()

if __name__ == '__main__':
    asyncio.run(main())

get为什么需要timeout, 因为一开始put的过程要滞后一些,而get要求实时。要是使用await,线程会一直卡住

环境:
python: 3.10.x

共有2个答案

胡厉刚
2024-07-08

asyncio是单线程并发。

关于 asyncio.Queue() 给你一个代码例子:

import asyncio

async def producer(queue):
    for i in range(5):
        await queue.put(i)
        print(f"Produced: {i}")
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Consumed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    await asyncio.gather(producer_task, consumer_task)
    await queue.join()

asyncio.run(main())

producer 函数负责向队列中放入数据。
consumer 函数从队列中取出数据。
在 main 函数中,创建了一个 asyncio.Queue 对象,并启动了生产者和消费者的任务。
通过 queue.join() 确保所有的任务都完成处理。

季嘉良
2024-07-08

首先,要澄清的是,asyncio 是为单线程并发设计的,它并不直接支持多线程间的同步。在你的例子中,你试图在一个线程中使用 asyncioQueue,并在另一个线程中尝试从该队列中同步地获取数据,这是有问题的。

然而,关于你的问题,future.result(timeout) 需要一个超时的原因是因为 future.result() 是一个阻塞调用,它会等待 Future 完成(即,等待与之关联的协程完成)。如果没有设置超时,并且 Future 永远不会完成(例如,由于某种错误或死锁),那么调用 future.result() 的线程将会永远阻塞。

在你的例子中,由于 audioQueue.get() 是一个协程,你需要在一个协程中等待它,而不是在一个线程中同步地等待它的结果。但是,由于你需要在另一个线程中调用它,你使用了 asyncio.run_coroutine_threadsafe(),它返回一个 Future,你可以等待这个 Future 的结果。

但是,由于 audioQueue.get() 可能会阻塞(如果没有可用的项目),你需要设置一个超时来防止调用 future.result() 的线程无限期地等待。

然而,你的代码有几个问题:

  1. time.sleep(1) 会阻塞整个线程,包括事件循环,这可能导致问题。你应该使用 await asyncio.sleep(1) 代替,但这只能在协程中使用。
  2. 你试图在一个线程中运行协程(使用 asyncio.run_coroutine_threadsafe()),但在另一个线程中同步地等待它的结果。这可能导致竞争条件和难以调试的问题。
  3. audioQueue.qsize() 不是一个线程安全的操作,并且在多线程环境中使用它可能是危险的。

一个更好的方法可能是使用 asyncio 的同步原语(如锁和条件变量)来在协程之间同步,而不是在线程之间同步。但是,如果你确实需要在多线程环境中使用 asyncio 的队列,你应该考虑重新设计你的代码以避免需要超时。

不过,对于你当前的问题,解决方案是使用超时来防止无限期地等待 Future 的结果。但是,请注意,这只是一个临时的解决方案,并且可能需要更深入地考虑你的多线程和 asyncio 集成策略。

 类似资料:
  • 我发现 await 并没有用 update_product_loop 还是立刻就执行力,那 await 和 async 的到底是什么含义,以及我要怎么才能做到真正的等 异步任务 a 完成再去其它呢,就是说 a 里有很多子任务是异步的

  • The fear of the LORD is the beginning of knowledge; fools despise wisdom and instruction.(PROVERBS 1:7) 敬畏耶和华是知识的开端,愚妄人藐视智慧和训诲。 关于Python的故事 如同学习任何一种自然语言比如英语、或者其它编程语言比如汇编一样,总要说一说有关这种语言的事情,有的可能就是八卦,越八卦的

  • 问题内容: 我正在尝试解决此错误:在我的异步过程中。我相信这是因为在任务仍未完成时发生故障,然后尝试关闭事件循环。我以为我需要在关闭事件循环之前等待其余的响应,但是我不确定如何在我的特定情况下正确完成该操作。 我该如何处理错误并正确关闭事件循环,以便可以启动一个新程序并从本质上重新启动整个程序并继续。 编辑: 根据这个答案,这就是我现在正在尝试的方法。不幸的是,这种错误很少发生,因此,除非我可以强

  • 作为一个 Python 新手,你必须熟悉基础知识。在本文中我们将讨论一些 Python 面试的基础问题和高级问题以及答案,以帮助你完成面试。包括 Python 开发问题、编程问题、数据结构问题、和 Python 脚本问题。让我们来深入研究这些问题。

  • Python 是一种极少数能声言兼具 简单 与 功能强大 的编程语言。你将惊异于发现你正在使用的这门编程语言是如此简单,它专注于如何解决问题,而非拘泥于语法与结构。 官方对 Python 的介绍如下: Python 是一款易于学习且功能强大的编程语言。 它具有高效率的数据结构,能够简单又有效地实现面向对象编程。Python 简洁的语法与动态输入之特性,加之其解释性语言的本质,使得它成为一种在多种领

  • 问题内容: 我有以下情况: Python 3.6+ 从文件中逐行读取输入数据。 协程将数据发送到API(使用),并将调用结果保存到Mongo(使用)。因此,有很多IO正在进行。 该代码使用/编写,并且对于手动执行的单个调用也可以正常工作。 我不知道该怎么做,就是要大量使用输入数据。 我看到的所有示例都通过发送有限列表作为参数进行了演示。但是我不能简单地向它发送任务列表,因为输入文件可能有数百万行。