Python-multiprocessing进程管理

钱华晖
2023-12-01

multiprocessing模块包含一个API,它基于threading API可以在多个进程间划分工作。有些情况下,multiprocessing可以作为临时替换,取代threading来利用多个CPU内核,避免全局解释器锁带来的性能瓶颈。

1. multiprocessing基础


创建进程(MP.Process)

要创建第二个进程,最简单的方法是实例化一个Process对象,并调用start()让其工作。
import multiprocessing

def worker():
    print 'Worker'
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target = worker)
        jobs.append(p)
        p.start()

执行结果将会打印5次‘Worker',不过不清楚孰先孰后,这取决于具体的执行顺序,因为每个进程都在竞争访问输出流。更有用的做法是,创建一个进程时可以提供参数。与threading不同,要向一个multiprocessing Process传递参数,这个参数必须能够使用pickle串行化。下面的例子向各个工作进程传递一个要打印的数。
import multiprocessing
import time

def worker(num):
    print "Worker", num
    time.sleep(0.1)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target = worker, args = (i,))
        jobs.append(p)
        p.start()
Worker 0
Worker 2
Worker 3
Worker 4
Worker 1

可导入的目标函数

threading与multiprocessing例子之间有一个区别,multiprocessing例子中对__main__使用了额外的保护。对于新进程的启动方式,要求子进程能够导入包含目标函数的脚本。可以讲应用的主要部分包装在一个__main__检查中,确保模块导入时不会在各个子进程中递归地运行。另外一个方法是从一个单独的脚本中导入目标函数,下面例子中进程的工作函数是simple.py中worker函数:
import multiprocessing
import simple

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
                target = simple.worker,
                )
        jobs.append(p)
        p.start()

确定当前进程(MP.current_process().name)

每个Process实例都有一个名称,其默认值可以在创建进程时改变。给进程命名对于跟踪进程很有用,特别是在当前应用中有多种类型的进程同时运行时。
import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(2)
    print name, 'Exiting'

def my_device():
    name = multiprocessing.current_process().name
    print name, 'Starting'
    time.sleep(3)
    print name, 'Exiting'

if __name__ == '__main__':
    service = multiprocessing.Process(name = 'my_service',target = my_device)
    worker_1 = multiprocessing.Process(name = 'worker 1', target = worker)
    worker_2 = multiprocessing.Process(target = worker) #default name, or set name by worker_2.name = 'worker_2'

    worker_1.start()
    worker_2.start()
    service.start()
进程名称为Process-3的行对应未命名的进程worker_2。
worker 1 Starting
worker 1 Exiting
Process-3 Starting
Process-3 Exiting
my_service Starting
my_service Exiting

守护进程(P.daemon=True)

要标志一个进程为守护进程,可以将其daemon属性设置为True,默认情况下不为守护进程。
service = multiprocessing.Process(name = 'my_service',target = my_device)
service.daemon = True
主进程可以使用join()等待守护进程退出,也可以给join传入一个超时参数(浮点数,单位为秒),即使进程在这个超时范围内没有完成,join()也会返回,此时daemon进程会继续运行,不会终结。情况和threading一样。

终止进程(P.terminate())

尽管最好使用“毒丸”(posion pill)方法向进程发出信号让其退出,但是如果一个进程看起来已经挂起或者陷入死锁,则需要能够强制性的结束。对一个进程调用terminate()会结束子进程。
import multiprocessing
import time

def slow_worker():
    print 'Starting worker'
    time.sleep(10)
    print 'finished worker'

if __name__ == '__main__':
    p = multiprocessing.Process(target = slow_worker)
    print "Befor:", p, p.is_alive()
    p.start()
    print 'During:', p, p.is_alive()
    p.terminate()
    print 'Terminate:', p, p.is_alive()
    p.join()
    print 'Joined:', p, p.is_alive()
Befor: <Process(Process-1, initial)> False
During: <Process(Process-1, started)> True
Terminate: <Process(Process-1, started)> True
Joined: <Process(Process-1, stopped[SIGTERM])> False
注意:终止进程后要使用join()退出进程,使进程管理代码有时间更新对象的状态,以反映进程已经终止。

进程的退出状态(P.exitcode)

进程退出时声称的状态码可以通过exitcode属性访问。状态码的范围是:
  • == 0 : 未生成任何错误
  • >0     : 进程有一个错误,并以该错误码退出
  • <0     :进程由一个-1*exitcode信号结束

2. 日志

调试并发问题时,能够访问multiprocessing提供的对象的内部状态很有用。可以使用一个方便的模块级函数来启动日志记录,名为 log_to_stderr()。它使用logging建立一个日志记录器对象,并增加一个处理程序,使得日志消息将发送到标准错误通道,日志默认格式为  '[%(levelname)s/%(processName)s] %(message)s’
import multiprocessing
import logging
import sys

def worker():
    print "Doing some work"
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target = worker)
    p.start()
    p.join()
默认情况下,日志级别设置为NOTSET,即不产生任何消息。通过传入一个不同的日志级别,可以初始化日志记录器,制定所需的详细程度。
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

要直接处理日志记录器(修改日志级别或添加处理程序),可以使用 get_logger()
import multiprocessing
import logging
import sys

def worker():
    print 'Doing some work'
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target = worker)
    p.start()
    p.join()
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

3. 派生进程

名字听上去挺唬人的,其实就是把multiprocessing.Process作为基类来创建新的class。
import multiprocessing

class Worker(multiprocessing.Process):
def run(self):
	print "In %s' % self.name
	return

if __name__ == '__main__':
	jobs = []
	for i in range(5):
		p = Worker()
		jobs.append(p)
		p.start()
	for j in jobs:
		j.join()
In Worker-1
In Worker-2
In Worker-3
In Worker-5
In Worker-4
派生进程应当覆盖run()来完成工作

4. 消息传递

类似于线程,对于多个进程,一种常用的模式是将一个工作划分为多个工作进程并行地运行。要想有效地使用多个进程,通常它们之间有某种通信,这样才能有效分解工作,并完成结果的汇总。利用multiprocessing完成进程通信的一种简单方法是使用一个Queue来传递消息。能够用pickle串行化的任何对象都可以通过Queue传递。
import multiprocessing

class MyFancyClass(object):

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print 'Doing something fancy in %s for %s' % (proc_name, self.name)

def worker(q):
    obj = q.get() # get a object from the queue
    obj.do_something() # doing the task of the object

if __name__ == '__main__':
    queue = multiprocessing.Queue() # the queue
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan')) # put one object to the queue

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()
这个例子只是像一个工作进程传递一个消息,然后主进程等待这个工作进程完成。结果为:
Doing something fancy in Process-1 for Fancy Dan

5. 线程池

Multiprocessing同样提供了线程池,可设置线程池大小等。
from multiprocessing import Pool

def my_func(x):
    print x**2

pool = Pool(processes=5)
target = range(10)
pool.map(my_func, target)
上述代码定义了一个5个线程大小的线程池,用以计算每个数的平方。


 类似资料: