需求:
一个asyncio.Queue()
作为中间交换。一个线程往里put, 一个线程从里get. 使用future总是取不得值.
示例demo:
import asyncio
import threading, time
audioQueue = asyncio.Queue()
def job(task,loop):
def _asyncJob(task, loop):
result = 'hello'
time.sleep(1)
asyncio.run_coroutine_threadsafe(task(result), loop)
loop.call_soon_threadsafe(lambda: print('size:%s' % audioQueue.qsize()))
threading.Thread(name=f"transcribe_thread_1", target=_asyncJob, args=(task, loop)).start()
def sync_function(loop):
# 这是一个同步函数
task = lambda result : audioQueue.put(result)
job(task, loop)
def getEle(loop):
try:
future = asyncio.run_coroutine_threadsafe(audioQueue.get(), loop)
chnStatement = future.result(0.1)
except asyncio.TimeoutError as exout:
chnStatement = 'timeout'
except asyncio.CancelledError as excel:
chnStatement = 'cancel'
except Exception as exc:
chnStatement = 'exception'
print(f'get element {chnStatement}')
async def main():
# get the current event loop
loop = asyncio.get_running_loop()
sync_function(loop)
threading.Thread(name=f"transcribe_thread_2", target=getEle, args=(loop,)).start()
if __name__ == '__main__':
asyncio.run(main())
get为什么需要timeout, 因为一开始put的过程要滞后一些,而get要求实时。要是使用await,线程会一直卡住
环境:
python: 3.10.x
asyncio
是单线程并发。
关于 asyncio.Queue()
给你一个代码例子:
import asyncio
async def producer(queue):
for i in range(5):
await queue.put(i)
print(f"Produced: {i}")
await asyncio.sleep(1)
async def consumer(queue):
while True:
item = await queue.get()
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue()
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
await asyncio.gather(producer_task, consumer_task)
await queue.join()
asyncio.run(main())
producer 函数负责向队列中放入数据。
consumer 函数从队列中取出数据。
在 main 函数中,创建了一个 asyncio.Queue 对象,并启动了生产者和消费者的任务。
通过 queue.join() 确保所有的任务都完成处理。
首先,要澄清的是,asyncio
是为单线程并发设计的,它并不直接支持多线程间的同步。在你的例子中,你试图在一个线程中使用 asyncio
的 Queue
,并在另一个线程中尝试从该队列中同步地获取数据,这是有问题的。
然而,关于你的问题,future.result(timeout)
需要一个超时的原因是因为 future.result()
是一个阻塞调用,它会等待 Future
完成(即,等待与之关联的协程完成)。如果没有设置超时,并且 Future
永远不会完成(例如,由于某种错误或死锁),那么调用 future.result()
的线程将会永远阻塞。
在你的例子中,由于 audioQueue.get()
是一个协程,你需要在一个协程中等待它,而不是在一个线程中同步地等待它的结果。但是,由于你需要在另一个线程中调用它,你使用了 asyncio.run_coroutine_threadsafe()
,它返回一个 Future
,你可以等待这个 Future
的结果。
但是,由于 audioQueue.get()
可能会阻塞(如果没有可用的项目),你需要设置一个超时来防止调用 future.result()
的线程无限期地等待。
然而,你的代码有几个问题:
time.sleep(1)
会阻塞整个线程,包括事件循环,这可能导致问题。你应该使用 await asyncio.sleep(1)
代替,但这只能在协程中使用。asyncio.run_coroutine_threadsafe()
),但在另一个线程中同步地等待它的结果。这可能导致竞争条件和难以调试的问题。audioQueue.qsize()
不是一个线程安全的操作,并且在多线程环境中使用它可能是危险的。一个更好的方法可能是使用 asyncio
的同步原语(如锁和条件变量)来在协程之间同步,而不是在线程之间同步。但是,如果你确实需要在多线程环境中使用 asyncio
的队列,你应该考虑重新设计你的代码以避免需要超时。
不过,对于你当前的问题,解决方案是使用超时来防止无限期地等待 Future
的结果。但是,请注意,这只是一个临时的解决方案,并且可能需要更深入地考虑你的多线程和 asyncio
集成策略。
我发现 await 并没有用 update_product_loop 还是立刻就执行力,那 await 和 async 的到底是什么含义,以及我要怎么才能做到真正的等 异步任务 a 完成再去其它呢,就是说 a 里有很多子任务是异步的
The fear of the LORD is the beginning of knowledge; fools despise wisdom and instruction.(PROVERBS 1:7) 敬畏耶和华是知识的开端,愚妄人藐视智慧和训诲。 关于Python的故事 如同学习任何一种自然语言比如英语、或者其它编程语言比如汇编一样,总要说一说有关这种语言的事情,有的可能就是八卦,越八卦的
问题内容: 我正在尝试解决此错误:在我的异步过程中。我相信这是因为在任务仍未完成时发生故障,然后尝试关闭事件循环。我以为我需要在关闭事件循环之前等待其余的响应,但是我不确定如何在我的特定情况下正确完成该操作。 我该如何处理错误并正确关闭事件循环,以便可以启动一个新程序并从本质上重新启动整个程序并继续。 编辑: 根据这个答案,这就是我现在正在尝试的方法。不幸的是,这种错误很少发生,因此,除非我可以强
作为一个 Python 新手,你必须熟悉基础知识。在本文中我们将讨论一些 Python 面试的基础问题和高级问题以及答案,以帮助你完成面试。包括 Python 开发问题、编程问题、数据结构问题、和 Python 脚本问题。让我们来深入研究这些问题。
Python 是一种极少数能声言兼具 简单 与 功能强大 的编程语言。你将惊异于发现你正在使用的这门编程语言是如此简单,它专注于如何解决问题,而非拘泥于语法与结构。 官方对 Python 的介绍如下: Python 是一款易于学习且功能强大的编程语言。 它具有高效率的数据结构,能够简单又有效地实现面向对象编程。Python 简洁的语法与动态输入之特性,加之其解释性语言的本质,使得它成为一种在多种领
问题内容: 我有以下情况: Python 3.6+ 从文件中逐行读取输入数据。 协程将数据发送到API(使用),并将调用结果保存到Mongo(使用)。因此,有很多IO正在进行。 该代码使用/编写,并且对于手动执行的单个调用也可以正常工作。 我不知道该怎么做,就是要大量使用输入数据。 我看到的所有示例都通过发送有限列表作为参数进行了演示。但是我不能简单地向它发送任务列表,因为输入文件可能有数百万行。