python异步asyncio函数多线程操作run_in_executor和run_coroutine_threadsafe解析

澹台季萌
2023-12-01

刚接触异步asyncio,发现python文档一个版本一个变化,恼火。
东拼西凑终于解决了asyncio的多线程操作,记录下来。对于一个门外汉。太难了。

首先python3.9版本终于封装了一个多线程函数

asyncio.to_thread(func, /, *args, **kwargs) #函数直接开线程,传递参数。感觉这才像python

对于3.9版本以下,asyncio多线程实现就是,两个函数:

run_in_executor 和 run_coroutine_threadsafe

对应两种需求。
一个是主线程,直接新开线程,运行阻塞函数。方式就像之前写的同步代码一样。代码如下(用到了run_in_executor)

from concurrent.futures import ThreadPoolExecutor
import asyncio,time

def zusehanshu():   #测试的阻塞函数
           time.sleep(10)
           threadmingzi()
           return "after 10s"
           
async def returnfuture(): #测试新建线程
        loop=asyncio.get_event_loop() 
        newexecutor=ThreadPoolExecutor()
        future=await loop.run_in_executor(newexecutor,zusehanshu)#执行阻塞函数
        print(future)
        
def threadmingzi(): #查看当前线程名字
        import threading
        print("当前线程名字:",threading.current_thread())
        
asyncio.run(returnfuture())

输出结果:
当前线程名字: <Thread(ThreadPoolExecutor-0_0, started daemon 3252)>
after 10s(10s后输出)

上面可以看到,已经新建线程中运行了
第二种是主线程,直接新开线程,运行项目循环event-loop,推送函数到event-loop,达到多线程运行。(用到了run_coroutine_threadsafe)

import asyncio,time,threading

async def zusehanshu():   #测试的阻塞函数
        await asyncio.sleep(10)
        threadmingzi()
        
def threadmingzi(): #查看当前线程名字
        print("当前线程名字:",threading.current_thread())
        
def startloop(loop): #新建event-loop
        threadmingzi() #看一下线程名字
        asyncio.set_event_loop(loop)
        loop.run_forever()
        
xinjianloop=asyncio.new_event_loop()
threading.Thread(target=startloop,args=(xinjianloop,)).start()
asyncio.run_coroutine_threadsafe(zusehanshu(),xinjianloop)

输出结果:
当前线程名字: <Thread(Thread-1, started 7672)>
当前线程名字: <Thread(Thread-1, started 7672)>
上面可以看到,已经新建线程中运行了

总结:
asyncio原理就是遍历event-loop获取消息,是一个单线程阻塞函数。
遍历过程中遇到某个协程阻塞就会卡住,会一直等待结果返回,这时候就需要用到多线程防止阻塞。
通过上面两个方式,run_in_executor,与同步函数调用相似。用的比较习惯。
而run_coroutine_threadsafe新线程中建立新event-loop,可以动态添加协程,这个用途自己考虑了。

 类似资料: