multiprocessing模块包含一个API,它基于threading API可以在多个进程间划分工作。有些情况下,multiprocessing可以作为临时替换,取代threading来利用多个CPU内核,避免全局解释器锁带来的性能瓶颈。
import multiprocessing
def worker():
print 'Worker'
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target = worker)
jobs.append(p)
p.start()
import multiprocessing
import time
def worker(num):
print "Worker", num
time.sleep(0.1)
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target = worker, args = (i,))
jobs.append(p)
p.start()
Worker 0
import multiprocessing
import simple
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(
target = simple.worker,
)
jobs.append(p)
p.start()
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(2)
print name, 'Exiting'
def my_device():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(3)
print name, 'Exiting'
if __name__ == '__main__':
service = multiprocessing.Process(name = 'my_service',target = my_device)
worker_1 = multiprocessing.Process(name = 'worker 1', target = worker)
worker_2 = multiprocessing.Process(target = worker) #default name, or set name by worker_2.name = 'worker_2'
worker_1.start()
worker_2.start()
service.start()
service = multiprocessing.Process(name = 'my_service',target = my_device)
service.daemon = True
主进程可以使用join()等待守护进程退出,也可以给join传入一个超时参数(浮点数,单位为秒),即使进程在这个超时范围内没有完成,join()也会返回,此时daemon进程会继续运行,不会终结。情况和threading一样。
import multiprocessing
import time
def slow_worker():
print 'Starting worker'
time.sleep(10)
print 'finished worker'
if __name__ == '__main__':
p = multiprocessing.Process(target = slow_worker)
print "Befor:", p, p.is_alive()
p.start()
print 'During:', p, p.is_alive()
p.terminate()
print 'Terminate:', p, p.is_alive()
p.join()
print 'Joined:', p, p.is_alive()
Befor: <Process(Process-1, initial)> False
import multiprocessing
import logging
import sys
def worker():
print "Doing some work"
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target = worker)
p.start()
p.join()
import multiprocessing
import logging
import sys
def worker():
print 'Doing some work'
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target = worker)
p.start()
p.join()
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print "In %s' % self.name
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s' % (proc_name, self.name)
def worker(q):
obj = q.get() # get a object from the queue
obj.do_something() # doing the task of the object
if __name__ == '__main__':
queue = multiprocessing.Queue() # the queue
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan')) # put one object to the queue
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
这个例子只是像一个工作进程传递一个消息,然后主进程等待这个工作进程完成。结果为:
from multiprocessing import Pool
def my_func(x):
print x**2
pool = Pool(processes=5)
target = range(10)
pool.map(my_func, target)
上述代码定义了一个5个线程大小的线程池,用以计算每个数的平方。