当前位置: 首页 > 工具软件 > Python_Sync > 使用案例 >

Python多任务学习笔记(9)——进程池即进程池对象的apply、apply_sync等方法简介

欧阳玺
2023-12-01

1. 进程池简介

Python多任务学习笔记(7)——进程中,我们知道进程是Python中实现多任务的另一种方式,但是也了解到由于进程一般占用资源较大,当有很多个任务需要同时完成,如果一开始就创建和任务数相同的进程,将不仅导致创建进程的代码冗余量大,还将造成大量资源占用。

基于以上原因,Python中引入了进程池,可以调用multiprocessing包中的Pool()函数来进行创建。使用Pool()函数创建进程池时可以指定进程池能自动创建的进程最大数目(由于进程占用资源较大且设备的内存、CPU等资源有限,该数量通常由测试人员根据对设备的测试所得出的最优数值确定),这样后续当有新任务需要通过进程完成时,如果当进程池未满时,就会自动使用新的进程来完成该任务,如果进程池已满时,新任务将等待进程池中的任务结束后,使用已有进程完成任务。

2. 进程池示例代码

下面代码实现:通过创建一个最多可容纳3个进程的进程池,完成10个任务。

from multiprocessing import Pool
import os
import time
import random


def work(task_num):
    t_start = time.time()

	# 使用os模块中的getpid()函数,可以获取当前运行的进程ID(PID:Process ID)
    print("%d号任务开始执行,进程号为%d" % (task_num, os.getpid()))

    time.sleep(random.random() * 2)

    t_stop = time.time()

    print("%d号任务执行完毕,耗时%0.2f" % (task_num, t_stop - t_start))


def main():
    # 创建一个进程池,指定最大容量为3
    pool = Pool(3)
    for each in range(0, 10):
        pool.apply_async(func=work, args=(each,))

    print("Before calling close()...")
    pool.close()
    pool.join()
    print("After calling join()...")


if __name__ == "__main__":
    main()

运行上述代码后,可得到以下结果:

Before calling close()…
0号任务开始执行,进程号为10206
1号任务开始执行,进程号为10207
2号任务开始执行,进程号为10208
2号任务执行完毕,耗时0.83
3号任务开始执行,进程号为10208
1号任务执行完毕,耗时1.62
4号任务开始执行,进程号为10207
0号任务执行完毕,耗时1.65
5号任务开始执行,进程号为10206
5号任务执行完毕,耗时0.05
6号任务开始执行,进程号为10206
3号任务执行完毕,耗时1.39
7号任务开始执行,进程号为10208
7号任务执行完毕,耗时0.17
8号任务开始执行,进程号为10208
4号任务执行完毕,耗时1.39
9号任务开始执行,进程号为10207
6号任务执行完毕,耗时1.37
8号任务执行完毕,耗时0.96
9号任务执行完毕,耗时1.74
After calling join()…

分析上述程序运行输出,可知:程序执行后,由于任务数大于进程池最大容量,故系统很快创建了3个进程,且进程ID(pid)分别为10206、10207、10208,而后续的任务执行时,其进程ID都是这3个之一,故进程池创建的进程完成了当前任务后,当仍有未完成任务时,这些任务将复用进程池中处于空闲状态的进程

3. 进程池对象方法

上述示例代码中使用到了多个进程池对象方法,下面结合Python官方文档,详细分析各方法作用,以及上述代码需要强调之处。

3.1 apply(func[, args[, kwds]])

该方法将在一个新的进程中调用func指定的函数,并通过args元组形式func中传递多值参数,通过kwds字典形式func中传递多键值对。由于该函数采用同步调用机制,故其会等待func执行完毕才返回。

下面是使用apply()方法的示例代码:

import multiprocessing
import os
import time
import random


def work(*args, **kwargs):
    t_start = time.time()

    print("%d号任务开始执行,进程号为%d" % (args[0], os.getpid()))

    time.sleep(random.random() * 2)

    t_stop = time.time()

    print(kwargs)

    print("%d号任务执行完毕,耗时%0.2f" % (args[0], t_stop - t_start))

    print()  # 换行使结果更清晰


def main():
    # 定义一个进程池,指定最大容量3
    process_pool = multiprocessing.Pool(processes=3)
    for each in range(0, 5):
        process_pool.apply(func=work,
                           args=(each,), kwds={"key": "value"})

    process_pool.close()


if __name__ == "__main__":
    main()

上述代码的运行结果为:

0号任务开始执行,进程号为6697
{‘key’: ‘value’}
0号任务执行完毕,耗时0.01


1号任务开始执行,进程号为6698
{‘key’: ‘value’}
1号任务执行完毕,耗时1.85


2号任务开始执行,进程号为6699
{‘key’: ‘value’}
2号任务执行完毕,耗时1.39


3号任务开始执行,进程号为6697
{‘key’: ‘value’}
3号任务执行完毕,耗时1.44


4号任务开始执行,进程号为6698
{‘key’: ‘value’}
4号任务执行完毕,耗时1.40

由上述代码的运行结果可知,由于apply()方法的同步调用机制,使用进程池执行的5项任务是顺序执行的。

3.2 apply_async(func[, args[, kwds[, callback[, error_callback]]]])

  • 此为apply()方法的变体,即该方法为异步调用,也就是说调用了该方法后,其会立马返回一个AsyncResult 对象,而该对象又具有一个采用同步调用机制的方法get(),用于等待func返回并获取其返回值,即pool_obj.apply_async()等价于pool_obj.apply().get()

  • 当指定callback参数时,其应当是一个仅接受一个参数(一个元组或字典,当作为一个整体,也都算是一个参数)的可调用对象(如函数引用),当func运行完成,则callback被调用。

  • 当上述callback调用失败,且指定了error_callback(同样应当是一个仅接受一个参数的可调用对象(如函数引用)),则调用error_callback,若调用失败,则抛出异常。

下面是使用apply_async()方法的示例代码(关于代码中为什么要使用join()方法,请看下面关于该方法的说明):

import multiprocessing
import os
import time
import random

result_list = list()


def log_result(result):
    result_list.append(result)


def work(*args, **kwargs):
    t_start = time.time()

    print("%d号任务开始执行,进程号为%d" % (args[0], os.getpid()))

    time.sleep(random.random() * 2)

    t_stop = time.time()

    print(kwargs)

    print("%d号任务执行完毕,耗时%0.2f" % (args[0], t_stop - t_start))

    print()  # 换行使结果更清晰

    return args[0]


def main():
    # 定义一个进程池,指定最大容量3
    process_pool = multiprocessing.Pool(processes=3)
    for each in range(0, 5):
        process_pool.apply_async(func=work,
                                 args=(each,), kwds={"key": "value"},
                                 callback=log_result)

    process_pool.close()
    process_pool.join()

    print(result_list)


if __name__ == "__main__":
    main()

上述代码运行结果为:

0号任务开始执行,进程号为7144
1号任务开始执行,进程号为7145
2号任务开始执行,进程号为7146
{‘key’: ‘value’}
0号任务执行完毕,耗时0.16


3号任务开始执行,进程号为7144
{‘key’: ‘value’}
2号任务执行完毕,耗时0.73


4号任务开始执行,进程号为7146
{‘key’: ‘value’}
4号任务执行完毕,耗时0.28


{‘key’: ‘value’}
1号任务执行完毕,耗时1.55


{‘key’: ‘value’}
3号任务执行完毕,耗时1.71


[0, 2, 4, 1, 3]

结合上述代码的运行结果可知:

  • 使用apply_async()方法向进程池中提交任务时,由于该方法采用异步调用机制(即该方法被调用后,会立马返回一个结果,不会等待func的执行结束再返回),所以会很快创建指定数量的进程,然后各个进程同时开始执行任务。
  • func成功执行完后,log_result()将被调用,且仅接受的一个参数为func的返回值。

3.3 close()

调用该方法后,进程池对象将不再接受提交任务的申请。

该方法并不是关闭进程池,可以将其理解为关闭进入进程池的入口,而在调用close()方法之前,即使进程池中所有的进程都被任务占据,依然可以将任务提交至进程池中,关于原因,翻看apply_async()的源码后,个人认为:

  • 因为进程池对象中有一个私有属性_taskqueue,当调用apply_async()方法时,funcargskwds等参数均会被添加至_taskqueue队列;
  • 所以调用apply_async()方法后,所有的待执行任务都将被添加至进程池对象的私有属性_taskqueue中,如果进程池中有空闲进程,则任务将立马得到执行,否则,任务继续在队列中等待进程中有进程空闲出来。

3.4 join()

该方法用于等待进程池中的进程完成所有任务(包括正在执行的任务和_taskqueue中等待执行的任务)后退出,该方法必须在调用了close()之后再调用。

如果使用apply_async()后,未调用该方法,则任务无法被正常执行完毕,因为主进程会先行运行完毕,一旦如此,则子进程必然挂掉。

 类似资料: