Python多任务学习笔记(7)——进程中,我们知道进程是Python中实现多任务的另一种方式,但是也了解到由于进程一般占用资源较大,当有很多个任务需要同时完成,如果一开始就创建和任务数相同的进程,将不仅导致创建进程的代码冗余量大,还将造成大量资源占用。
基于以上原因,Python中引入了进程池,可以调用multiprocessing包中的Pool()函数来进行创建。使用Pool()函数创建进程池时可以指定进程池能自动创建的进程最大数目(由于进程占用资源较大且设备的内存、CPU等资源有限,该数量通常由测试人员根据对设备的测试所得出的最优数值确定),这样后续当有新任务需要通过进程完成时,如果当进程池未满时,就会自动使用新的进程来完成该任务,如果进程池已满时,新任务将等待进程池中的任务结束后,使用已有进程完成任务。
下面代码实现:通过创建一个最多可容纳3个进程的进程池,完成10个任务。
from multiprocessing import Pool
import os
import time
import random
def work(task_num):
t_start = time.time()
# 使用os模块中的getpid()函数,可以获取当前运行的进程ID(PID:Process ID)
print("%d号任务开始执行,进程号为%d" % (task_num, os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
print("%d号任务执行完毕,耗时%0.2f" % (task_num, t_stop - t_start))
def main():
# 创建一个进程池,指定最大容量为3
pool = Pool(3)
for each in range(0, 10):
pool.apply_async(func=work, args=(each,))
print("Before calling close()...")
pool.close()
pool.join()
print("After calling join()...")
if __name__ == "__main__":
main()
运行上述代码后,可得到以下结果:
Before calling close()…
0号任务开始执行,进程号为10206
1号任务开始执行,进程号为10207
2号任务开始执行,进程号为10208
2号任务执行完毕,耗时0.83
3号任务开始执行,进程号为10208
1号任务执行完毕,耗时1.62
4号任务开始执行,进程号为10207
0号任务执行完毕,耗时1.65
5号任务开始执行,进程号为10206
5号任务执行完毕,耗时0.05
6号任务开始执行,进程号为10206
3号任务执行完毕,耗时1.39
7号任务开始执行,进程号为10208
7号任务执行完毕,耗时0.17
8号任务开始执行,进程号为10208
4号任务执行完毕,耗时1.39
9号任务开始执行,进程号为10207
6号任务执行完毕,耗时1.37
8号任务执行完毕,耗时0.96
9号任务执行完毕,耗时1.74
After calling join()…
分析上述程序运行输出,可知:程序执行后,由于任务数大于进程池最大容量,故系统很快创建了3个进程,且进程ID(pid)分别为10206、10207、10208,而后续的任务执行时,其进程ID都是这3个之一,故进程池创建的进程完成了当前任务后,当仍有未完成任务时,这些任务将复用进程池中处于空闲状态的进程。
上述示例代码中使用到了多个进程池对象方法,下面结合Python官方文档,详细分析各方法作用,以及上述代码需要强调之处。
该方法将在一个新的进程中调用func指定的函数,并通过args以元组形式向func中传递多值参数,通过kwds以字典形式向func中传递多键值对。由于该函数采用同步调用机制,故其会等待func执行完毕才返回。
下面是使用apply()方法的示例代码:
import multiprocessing
import os
import time
import random
def work(*args, **kwargs):
t_start = time.time()
print("%d号任务开始执行,进程号为%d" % (args[0], os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
print(kwargs)
print("%d号任务执行完毕,耗时%0.2f" % (args[0], t_stop - t_start))
print() # 换行使结果更清晰
def main():
# 定义一个进程池,指定最大容量3
process_pool = multiprocessing.Pool(processes=3)
for each in range(0, 5):
process_pool.apply(func=work,
args=(each,), kwds={"key": "value"})
process_pool.close()
if __name__ == "__main__":
main()
上述代码的运行结果为:
0号任务开始执行,进程号为6697
{‘key’: ‘value’}
0号任务执行完毕,耗时0.01
1号任务开始执行,进程号为6698
{‘key’: ‘value’}
1号任务执行完毕,耗时1.85
2号任务开始执行,进程号为6699
{‘key’: ‘value’}
2号任务执行完毕,耗时1.39
3号任务开始执行,进程号为6697
{‘key’: ‘value’}
3号任务执行完毕,耗时1.44
4号任务开始执行,进程号为6698
{‘key’: ‘value’}
4号任务执行完毕,耗时1.40
由上述代码的运行结果可知,由于apply()方法的同步调用机制,使用进程池执行的5项任务是顺序执行的。
- 此为apply()方法的变体,即该方法为异步调用,也就是说调用了该方法后,其会立马返回一个AsyncResult 对象,而该对象又具有一个采用同步调用机制的方法get(),用于等待func返回并获取其返回值,即pool_obj.apply_async()等价于pool_obj.apply().get()。
- 当指定callback参数时,其应当是一个仅接受一个参数(一个元组或字典,当作为一个整体,也都算是一个参数)的可调用对象(如函数引用),当func运行完成,则callback被调用。
- 当上述callback调用失败,且指定了error_callback(同样应当是一个仅接受一个参数的可调用对象(如函数引用)),则调用error_callback,若调用失败,则抛出异常。
下面是使用apply_async()方法的示例代码(关于代码中为什么要使用join()方法,请看下面关于该方法的说明):
import multiprocessing
import os
import time
import random
result_list = list()
def log_result(result):
result_list.append(result)
def work(*args, **kwargs):
t_start = time.time()
print("%d号任务开始执行,进程号为%d" % (args[0], os.getpid()))
time.sleep(random.random() * 2)
t_stop = time.time()
print(kwargs)
print("%d号任务执行完毕,耗时%0.2f" % (args[0], t_stop - t_start))
print() # 换行使结果更清晰
return args[0]
def main():
# 定义一个进程池,指定最大容量3
process_pool = multiprocessing.Pool(processes=3)
for each in range(0, 5):
process_pool.apply_async(func=work,
args=(each,), kwds={"key": "value"},
callback=log_result)
process_pool.close()
process_pool.join()
print(result_list)
if __name__ == "__main__":
main()
上述代码运行结果为:
0号任务开始执行,进程号为7144
1号任务开始执行,进程号为7145
2号任务开始执行,进程号为7146
{‘key’: ‘value’}
0号任务执行完毕,耗时0.16
3号任务开始执行,进程号为7144
{‘key’: ‘value’}
2号任务执行完毕,耗时0.73
4号任务开始执行,进程号为7146
{‘key’: ‘value’}
4号任务执行完毕,耗时0.28
{‘key’: ‘value’}
1号任务执行完毕,耗时1.55
{‘key’: ‘value’}
3号任务执行完毕,耗时1.71
[0, 2, 4, 1, 3]
结合上述代码的运行结果可知:
调用该方法后,进程池对象将不再接受提交任务的申请。
该方法并不是关闭进程池,可以将其理解为关闭进入进程池的入口,而在调用close()方法之前,即使进程池中所有的进程都被任务占据,依然可以将任务提交至进程池中,关于原因,翻看apply_async()的源码后,个人认为:
该方法用于等待进程池中的进程完成所有任务(包括正在执行的任务和_taskqueue中等待执行的任务)后退出,该方法必须在调用了close()之后再调用。
如果使用apply_async()后,未调用该方法,则任务无法被正常执行完毕,因为主进程会先行运行完毕,一旦如此,则子进程必然挂掉。