相对于异步的概念,同步指的是一个程序执行完才会执行另一个程序。
举例来说,对于单进程,普通的函数调用就是同步执行。只有当被调用的函数执行完并且return的时候,
才会继续执行主调函数中剩下的代码。
虽然在异步中通过await等待异步函数执行也是这个流程,但是如果一个进程有多于一个任务时,
当在被调的异步函数中阻塞时,就会去执行其他任务了。
python中的语法是
async 用来声明一个函数为异步函数
await 用来声明等待await后面跟的异步函数执行完返回
最开始有一个错误的理解:假设有伪代码await func_a
,在执行到伪代码await func_a
时,产生一个类似线程的东西去执行func_a函数
而不影响当前await func_a代码之后的代码的运行,即立马执行下面的语句。
但是不是这样子,因为有了await,所以需要等待func_a执行完才会继续执行await func_a后面的代码。
还有一个错误的想法:都异步了,为什么还需要等待结果?
answer:假设func_a函数处理完会返回一个结果,如果这时候不阻塞在await func_a这一行,而是继续向下执行
那么如果下一行代码需要用到func_a函数返回的结果,那么这是时候用到的这个结果就是一个None
比如一下代码
import asyncio
async def func_a():
await asyncio.sleep(2)
print(1)
async def func_b():
await func_a()
print(2)
asyncio.run(func_b())
当前只有一个任务,执行了func_b(),而func_b中等待func_a,所以在func_a中sleep了两秒后
打印1,再返回到func_b中打印2。
这就是因为当前只有一个任务,并且使用await来等待异步函数执行完返回后才会继续执行下面代码。
如果还有其他任务的话,执行到func_a的sleep时才会执行其他任务
可以理解成是一个死循环,循环检测并执行某些代码。
伪代码
任务列表 = [任务1,任务2,任务3。。。]
while True:
可执行的任务列表,已完成的任务列表 = 去任务列表中检查所有的任务,将可执行和已完成的任务返回。
# 注意任务列表中还可能有正在被阻塞的任务,本次查询不会被返回
for 就绪任务 in 可执行的任务列表:
执行已就绪的任务
for 已完成任务 in 已完成的任务列表:
在任务列表中移除 已完成的任务
如果任务列表中的任务都已经完成,任务列表为空,则终止循环。
代码:
import asyncio
# 生成一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(任务)
进程是指程序的一次运行过程,包括了程序、代码和进程控制块(PCB),是系统进行资源分配的独立单位,也是处理机分派和调度的基本单位。
后来为了让操作系统更好的处理并发,引入了线程。此时把线程作为处理及分派和调度的基本单位。一个进程中的多个线程共享该进程的资源,因此切换上下文会变快。
进程和线程都是操作系统级别的概念。
而协程是为了进一步提高上下文切换的效率,由程序员自己通过代码进行调度。
一个进程可以对应多个线程,一个线程可以对应多个协程
但是一个进程中的多个线程在某一时刻只能有一个线程在运行,同理一个线程对应的多个协程在某一时刻只能有一个协程在运行。
协程的概念是在一个线程中如果遇到IO或其他等待操作,这个线程会利用空闲时间处理其他任务。但前提是这个线程有多个协程任务。
import asyncio
async def func1():
print(1)
await asyncio.sleep(2) # 遇到IO自动切换
print(2)
async def func2():
print(3)
await asyncio.sleep(2)
print(4)
def main():
tasks = [
func1(),
func2()
]
# 下面这两行可以理解为创建了两个任务
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
main()
运行结果打印顺序为1324
理解首先主线程运行一个协程func1,打印1,然后遇到sleep了,这时挂起func1去执行另一个任务func2打印3
当func1的sleep完成后,不管执行func2的协程执行到哪里都会返回到func1去向下执行,打印2后协程结束
再去执行func2的打印4
协程函数:定义函数时用async def
协程对象:执行协程函数的到协程对象
async def func():
pass
result = func() result成为一个协程对象,此时func内部代码不会执行
如果想要运行协程函数内部代码,必须要将协程对象交给事件循环来处理
import asyncio
async def func():
print(1)
result = func()
# 生成一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(result)
python3.7后使用asynic.run(任务)
import asyncio
async def func():
print(1)
result = func()
asyncio.run(任务) run包括了生成事件循环列表
await + 可等待的对象(协程对象、Future对象、Task对象)
import asyncio
async def others():
print("start")
response = await asyncio.sleep(2)
print("end")
return 'return-value'
async def func():
print("执行协程函数内部代码")
# 遇到IO操作挂起当前协程(任务),等IO操作完成后再继续往下执行,
# 当前协程挂起时,事件循环可以去执行其他协程(任务)
response = await others()
print("IO请求结束,结果为:", response)
asyncio.run(func())
运行结果为
执行协程函数内部代码
start
end
IO请求结束,结果为: return-value
await就是等待对象的值得到结果之后再继续向下走
Task对象的作用是在事件循环中添加多个任务
Task用于并发调度协程,通过asyncio.create_task(协程对象)的
方式创建Task对象,这样可以让协程加入事件循环中等待被调用执行。
除了使用asyncio.create_task()外,还可以用低级loop.create_task()
或ensure_future()函数。
一种不普遍的写法
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return 'return-value'
async def main():
print('main start')
# 创建Task对象,将当前执行func函数的任务添加到事件循环
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
ret1 = await task1
ret2 = await task2
print('main end')
print(ret1, ret2)
asyncio.run(main())
运行结果
main start
1
1
2
2
main end
return-value return-value
普遍的写法
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return 'return-value'
async def main():
print('main start')
# 创建Task对象,将当前执行func函数的任务添加到事件循环
task_list = [
asyncio.create_task(func(), name='t1'),
asyncio.create_task(func(), name='t2')
]
# 等待列表中的任务完成
done, pending = await asyncio.wait(task_list)
# asyncio.wait返回一个二元组
print('main end')
print(done)
asyncio.run(main()) # 这一步就创建了事件循环
运行结果
main start
1
1
2
2
main end
{<Task finished name='t1' coro=<func() done, defined at F:\pycharm_project\main.py:3> result='return-value'>, <Task finished name='t2' coro=<func() done, defined at F:\pycharm_project\main.py:3> result='return-value'>}
也可以这么写
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return 'return-value'
task_list = [
func(),
func()
]
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)
Task继承自Future,Task对象内部await结果的处理基于Future对象
示例1
async def main():
# 获取创建的事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),什么都不干
fut = loop.create_future()
# 等待Future对象,没有结果会一直等下去
await fut
asyncio.run(main())
示例2
import asyncio
async def set_after(fut):
await asyncio.sleep(2)
fut.set_result('555')
async def main():
# 获取创建的事件循环
loop = asyncio.get_running_loop()
# 创建一个任务(Future对象),什么都不干
fut = loop.create_future()
# 创建一个任务(Task对象),绑定set_after函数,函数内部2s后,会给fut赋值
# 即手动设置future任务最终结果
await loop.create_task(set_after(fut))
# 等待Future对象
data = await fut
print(data)
asyncio.run(main())
concurrent.futures.Future对象(扩展)
使用线程池、进程池实现异步操作时用到的对象
import time
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
def func(value):
time.sleep(1)
print(value)
return 123
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
fut = pool.submit(func, i)
# print(fut)
从输出结果可以看到执行结果无序
案例:asyncio+不支持异步模块
import asyncio
import requests
async def download_image(url):
print('start download:', url)
loop = asyncio.get_event_loop()
# requests模块不支持异步操作,使用线程池配合实现
future = loop.run_in_executor(None, requests.get, url)
response = await future
print('download finish')
file_name = url.rsplit('-')[-1]
with open(file_name, mode='wb') as fp:
fp.write(response.content)
if __name__ == "__main__":
url_list = [
'url1', 'url2', 'url3'
]
tasks = [download_image(url) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
异步迭代器:实现了__aiter__()
和__anext__()
方法的对象(这两个方法自动执行),必须返回一个awaitable
对象。async_for
支持处理异步迭代器的__anext__()
方法返回的可等待对象,直到引发一个stopAsyncIteration
异常
异步可迭代对象:可在async_for
语句中被使用的对象,必须通过它的__aiter__()
方法返回一个asynchronous_iterator
(异步迭代器)
import asyncio
class Reader(object):
# 自定义异步迭代器,同时也是异步可迭代对象
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count == 20:
return None
return self.count
def __aiter__(self):
return self
async def __anext__(self):
val = await self.readline()
if val == None:
raise StopAsyncIteration
return val
async def func():
obj = Reader()
async for item in obj:
print(item)
asyncio.run(func())
通过定义__aenter__()
和__aexit__()
方法来对async_with语句中的环境进行控制,这两个方法自动执行。
import asyncio
class AsyncContextManager:
def __init__(self):
self.conn = None
async def do_something(self):
# 异步操作数据库
return 555
async def __aenter__(self):
# 异步连接数据库
self.conn = await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
# 异步关闭数据库
await asyncio.sleep(1)
async def func():
async with AsyncContextManager() as f:
result = await f.do_something()
print(result)
asyncio.run(func())