当前位置: 首页 > 工具软件 > asyncio > 使用案例 >

python异步asyncio

秦英发
2023-12-01

异步概念

相对于异步的概念,同步指的是一个程序执行完才会执行另一个程序。
举例来说,对于单进程,普通的函数调用就是同步执行。只有当被调用的函数执行完并且return的时候,
才会继续执行主调函数中剩下的代码。

虽然在异步中通过await等待异步函数执行也是这个流程,但是如果一个进程有多于一个任务时,
当在被调的异步函数中阻塞时,就会去执行其他任务了。

python中的语法是
async 用来声明一个函数为异步函数
await 用来声明等待await后面跟的异步函数执行完返回

最开始有一个错误的理解:假设有伪代码await func_a,在执行到伪代码await func_a时,产生一个类似线程的东西去执行func_a函数
而不影响当前await func_a代码之后的代码的运行,即立马执行下面的语句。
但是不是这样子,因为有了await,所以需要等待func_a执行完才会继续执行await func_a后面的代码。

还有一个错误的想法:都异步了,为什么还需要等待结果?
answer:假设func_a函数处理完会返回一个结果,如果这时候不阻塞在await func_a这一行,而是继续向下执行
那么如果下一行代码需要用到func_a函数返回的结果,那么这是时候用到的这个结果就是一个None

比如一下代码

import asyncio

async def func_a():
    await asyncio.sleep(2)
    print(1)
async def func_b():
    await func_a()
    print(2)
asyncio.run(func_b())

当前只有一个任务,执行了func_b(),而func_b中等待func_a,所以在func_a中sleep了两秒后
打印1,再返回到func_b中打印2。
这就是因为当前只有一个任务,并且使用await来等待异步函数执行完返回后才会继续执行下面代码。
如果还有其他任务的话,执行到func_a的sleep时才会执行其他任务

事件循环

可以理解成是一个死循环,循环检测并执行某些代码。

伪代码
任务列表 = [任务1,任务2,任务3。。。] 
while True:
     可执行的任务列表,已完成的任务列表 =  去任务列表中检查所有的任务,将可执行和已完成的任务返回。
     # 注意任务列表中还可能有正在被阻塞的任务,本次查询不会被返回
     for 就绪任务 in 可执行的任务列表:
		执行已就绪的任务

     for 已完成任务 in 已完成的任务列表:
 		在任务列表中移除 已完成的任务

	 如果任务列表中的任务都已经完成,任务列表为空,则终止循环。
	

代码:	
import asyncio

# 生成一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(任务)

进程、线程、协程

进程是指程序的一次运行过程,包括了程序、代码和进程控制块(PCB),是系统进行资源分配的独立单位,也是处理机分派和调度的基本单位。
后来为了让操作系统更好的处理并发,引入了线程。此时把线程作为处理及分派和调度的基本单位。一个进程中的多个线程共享该进程的资源,因此切换上下文会变快。
进程和线程都是操作系统级别的概念。
协程是为了进一步提高上下文切换的效率,由程序员自己通过代码进行调度

一个进程可以对应多个线程,一个线程可以对应多个协程
但是一个进程中的多个线程在某一时刻只能有一个线程在运行,同理一个线程对应的多个协程在某一时刻只能有一个协程在运行。

协程的概念是在一个线程中如果遇到IO或其他等待操作,这个线程会利用空闲时间处理其他任务。但前提是这个线程有多个协程任务。

import asyncio

async def func1():
    print(1)
    await asyncio.sleep(2) # 遇到IO自动切换
    print(2)

async def func2():
    print(3)
    await asyncio.sleep(2)
    print(4)

def main():
    tasks = [
        func1(),
        func2()
    ]
    # 下面这两行可以理解为创建了两个任务
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

if __name__ == '__main__':
    main()

运行结果打印顺序为1324
理解首先主线程运行一个协程func1,打印1,然后遇到sleep了,这时挂起func1去执行另一个任务func2打印3
当func1的sleep完成后,不管执行func2的协程执行到哪里都会返回到func1去向下执行,打印2后协程结束
再去执行func2的打印4

协程函数与协程对象

协程函数:定义函数时用async def
协程对象:执行协程函数的到协程对象

async def func():
    pass	
result = func()       result成为一个协程对象,此时func内部代码不会执行

如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理

import asyncio
async def func():
	print(1)
result = func()
# 生成一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(result)

python3.7后使用asynic.run(任务)
import asyncio
async def func():
	print(1)
result = func()
asyncio.run(任务)			run包括了生成事件循环列表

await

await + 可等待的对象(协程对象、Future对象、Task对象)

import asyncio

async def others():
    print("start")
	response = await asyncio.sleep(2)
	print("end")
	return 'return-value'

async def func():
	print("执行协程函数内部代码")
	# 遇到IO操作挂起当前协程(任务),等IO操作完成后再继续往下执行,
	# 当前协程挂起时,事件循环可以去执行其他协程(任务)
	response = await others()
	print("IO请求结束,结果为:", response)

asyncio.run(func())

运行结果为
执行协程函数内部代码
start
end
IO请求结束,结果为: return-value

await就是等待对象的值得到结果之后再继续向下走

Task对象

Task对象的作用是在事件循环中添加多个任务

Task用于并发调度协程,通过asyncio.create_task(协程对象)的
方式创建Task对象,这样可以让协程加入事件循环中等待被调用执行。
除了使用asyncio.create_task()外,还可以用低级loop.create_task()
或ensure_future()函数。

一种不普遍的写法
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return 'return-value'

async def main():
    print('main start')
    # 创建Task对象,将当前执行func函数的任务添加到事件循环
    task1 = asyncio.create_task(func())
    task2 = asyncio.create_task(func())

    ret1 = await task1
    ret2 = await task2
    print('main end')
    print(ret1, ret2)

asyncio.run(main())

运行结果
main start
1
1
2
2
main end
return-value return-value
普遍的写法
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return 'return-value'

async def main():
    print('main start')
    # 创建Task对象,将当前执行func函数的任务添加到事件循环
    task_list = [
        asyncio.create_task(func(), name='t1'),
        asyncio.create_task(func(), name='t2')
    ]

    # 等待列表中的任务完成
    done, pending = await asyncio.wait(task_list)
    # asyncio.wait返回一个二元组
    print('main end')
    print(done)

asyncio.run(main())  # 这一步就创建了事件循环


运行结果
main start
1
1
2
2
main end
{<Task finished name='t1' coro=<func() done, defined at F:\pycharm_project\main.py:3> result='return-value'>, <Task finished name='t2' coro=<func() done, defined at F:\pycharm_project\main.py:3> result='return-value'>}
也可以这么写
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return 'return-value'

task_list = [
    func(),
    func()
]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

asyncio.Future对象

Task继承自Future,Task对象内部await结果的处理基于Future对象

示例1
async def main():
	# 获取创建的事件循环
	loop = asyncio.get_running_loop()
	# 创建一个任务(Future对象),什么都不干
	fut = loop.create_future()
	# 等待Future对象,没有结果会一直等下去
	await fut
asyncio.run(main())
示例2
import asyncio
async def set_after(fut):
	await asyncio.sleep(2)
	fut.set_result('555')
	
async def main():
	# 获取创建的事件循环
	loop = asyncio.get_running_loop()
	# 创建一个任务(Future对象),什么都不干
	fut = loop.create_future()
	# 创建一个任务(Task对象),绑定set_after函数,函数内部2s后,会给fut赋值
	# 即手动设置future任务最终结果
	await loop.create_task(set_after(fut))
	# 等待Future对象
	data = await fut
	print(data)
	
asyncio.run(main())

concurrent.futures.Future对象(扩展)

使用线程池、进程池实现异步操作时用到的对象

import time
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

def func(value):
	time.sleep(1)
	print(value)
	return 123
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
	fut = pool.submit(func, i)
	# print(fut)


从输出结果可以看到执行结果无序
案例:asyncio+不支持异步模块

import asyncio
import requests

async def download_image(url):
	print('start download:', url)
	
	loop = asyncio.get_event_loop()
	# requests模块不支持异步操作,使用线程池配合实现
	future = loop.run_in_executor(None, requests.get, url)
	
	response = await future
	print('download finish')
	
	file_name = url.rsplit('-')[-1]
	with open(file_name, mode='wb') as fp:
		fp.write(response.content)
		
if __name__ == "__main__":
	url_list = [
		'url1', 'url2', 'url3'
	]
	tasks = [download_image(url) for url in url_list]
	loop = asyncio.get_event_loop()
	loop.run_until_complete(asyncio.wait(tasks))

异步迭代器

异步迭代器:实现了__aiter__()__anext__()方法的对象(这两个方法自动执行),必须返回一个awaitable对象。async_for支持处理异步迭代器的__anext__()方法返回的可等待对象,直到引发一个stopAsyncIteration异常

异步可迭代对象:可在async_for语句中被使用的对象,必须通过它的__aiter__()方法返回一个asynchronous_iterator(异步迭代器)

import asyncio

class Reader(object):
    # 自定义异步迭代器,同时也是异步可迭代对象
    def __init__(self):
        self.count = 0
    async def readline(self):
        self.count += 1
        if self.count == 20:
            return None
        return self.count
    def __aiter__(self):
        return self
    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val
async def func():
    obj = Reader()
    async for item in obj:
        print(item)

asyncio.run(func())

异步上下文管理器

通过定义__aenter__()__aexit__()方法来对async_with语句中的环境进行控制,这两个方法自动执行。

import asyncio

class AsyncContextManager:
    def __init__(self):
        self.conn = None

    async def do_something(self):
        # 异步操作数据库
        return 555

    async def __aenter__(self):
        # 异步连接数据库
        self.conn = await asyncio.sleep(1)
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步关闭数据库
        await asyncio.sleep(1)

async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)

asyncio.run(func())
 类似资料: