官方文档:https://docs.python.org/3.7/library/multiprocessing.html#using-a-pool-of-workers
# multiprocessing — Process-based parallelism 基于进程的并行性
# introduction multiprocessing多处理
# multiprocessing 是一个包 支持使用与threading模块相似的API来生成进程
# multiprocessing 提供本地和远程并发,通过使用子进程而不是线程有效地绕过全局解释器锁
# 因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它运行在Unix和Windows上
#multiprocessing包中
# 进程池 多进程 并行执行同一任务 例子:
from multiprocessing import Pool
def f1(x):
return x*x
def main1():
with Pool(5) as p:
print(p.map(f,[1,2,3,4,5,6,7,8,9,10]))
# multiprocessing 支持三种方式 启动进程
# spawn: 父进程启动一个新的python解释器进程.子进程只继承运行 Run()方法所需的资源。
# 来自父进程的不必要的文件描述符和句柄将不会被继承,运行速度比较慢
# unix and windows
# fork: 父进程使用os.fork()方法对Python解释器进行fork。子进程开始时实际上与父进程相同。
# 父进程的所有资源都由子进程继承。请注意,安全的fork多线程的进程是有问题的。
# unix
# forkserver: 当程序启动并选择forkserver的启动方法时,将启动服务器进程。
# 从那时起,每当需要一个新进程时,父进程就会连接到服务器,并请求它fork一个新进程。
# fork server进程是单线程的,所以使用os.fork()是安全的。没有任何不必要的资源被继承
# unix
# 使用方法:
import multiprocessing as mp
def foo(q):
q.put("hello")
def main2():
mp.set_start_method("spawn") # 这个方法只可以使用一次
q = mp.Queue()
p = mp.Process(target = foo,args=(q,))
p.start()
print(q.get())
p.join()
# 进程间通信:数据传递
# multiprocessing支持多进程之间的两种通信通道
# Queue/队列:与queue.Queue相似
# Queue 是 线程和进程安全的
# Pipe/管道:Pipe() 函数返回由管道连接的一对连接对象,默认为双工(双向)。
from multiprocessing import Process, Queue
def f2(q):
q.put([42, None, 'hello'])
def main3():
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
from multiprocessing import Process, Pipe
def f3(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe() # 返回管道的双端,每端都有send() recv()方法
p = Process(target=f3, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
进程间同步 Synchronization between processes
与线程同步原语相同
from multiprocessing import Process, Lock
lock = Lock()
lock.acquire()
lock.release()
进程间共享状态:
在进行并发编程时,通常最好尽量避免使用共享状态。在使用多个进程时尤其如此。
如果确实需要使用一些共享数据,multiprocessing提供了几种实现共享数据的方法
1 共享内存:
将数据存储在共享内存映射中,使用Value(), Array()
为了更灵活地使用共享内存,可以使用Multiprocessing.sharedctype模块,它支持创建从共享内存中分配的任意ctype对象。
2 Server process Manager:
通过Manager()返回的manager对象控制一个服务器进程管理器,该进程包含python对象,并允许其他进程使用并对他们进行操作
Manager() 支持这些类型 :list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array
服务器进程管理器比使用共享内存对象更灵活,因为它们可以用来支持任意对象类型。另外,一个管理器可以由不同计算机上的进程通过网络共享。但是,它们比使用共享内存慢。
workers pool:工作池
with Pool(processes=4) as pool: 开启4个工作进程
pool.map(f, range(10)) # 有序输出
for i in pool.imap_unordered(f, range(10)):
print(i) # 无序输出
res = pool.apply_async(f, (20,)) # 异步计算
res.get(timeout=1) #获取计算结果
或者:
pool = Pool(processes=4)
pool.map(f, range(10))
pool.close()
pool.join()
Process 和 exceptions
Process()对象有着threading.Thread相同的方法
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
如果子类重写构造函数的话,一定要调用父类的初始化Process.__init__())
run()
start() # 调用run()方法执行 target= 函数
join()
name()
is_alive()
daemon()
除了线程的方法外,还支持以下方法
pid() # 返回进程的pid
exitcode() #进程退出码,
authkey() #进程的身份验证密钥(字节字符串)。
sentinel()
terminate() #进程终止
kill() #终止进程,使用sigkill信号
close() #关闭对象 释放资源
Pipes and Queues
当使用多个进程时,通常使用消息传递来实现进程之间的通信,而不必使用任何同步原语(如锁)。
对于传递消息,可以使用Pipe()(用于两个进程之间的连接)或queue(允许多个生产者和消费者)。
The Queue, SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the queue.Queue class in the standard library.
class multiprocessing.Pipe(duplex)
returns a pair (conn1, conn2)
duplex = True 双向管道 False 单向管道
# 进程的共享队列
class multiprocessing.Queue([maxsize])
qsize()
empty()
full()
put()
put_nowait()
get()
get_nowait()
class multiprocessing.SimpleQueue
#它是一个简化的队列类型,非常接近一个锁定管道。
empty()
get()
put()
class multiprocessing.JoinableQueue([maxsize])
#JoinableQueue是一个Queue子类,它是一个队列,它还具有TaskDone()和Join()方法。
task_done()
join()
连接对象:Connection Objects
连接对象通常使用Pipe()创建
class multiprocessing.connection.Connection
send(obj)
recv()
poll()
send_bytes(b'thank you')
recv_bytes()
recv_bytes_into()
同步原语 Synchronization
通常,同步原语在多进程程序中并不像在多线程程序中那样必要
class multiprocessing.Barrier(parties[, action[, timeout]])
class multiprocessing.BoundedSemaphore([value])
class multiprocessing.Condition([lock])
class multiprocessing.Event()
class multiprocessing.Lock() #非递归锁
class multiprocessing.RLock() #递归锁
acquire()
release()
class multiprocessing.Semaphore([value])
Shared ctypes Objects 共享ctype对象
可以使用可由子进程继承的共享内存创建共享对象。
class multiprocessing.Value(typecode_or_type, *args, lock=True)
class multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
The multiprocessing.sharedctypes module
class multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
class multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
class multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
class multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
class multiprocessing.sharedctypes.copy(obj)
class multiprocessing.sharedctypes.synchronized(obj[, lock])
Managers:管理器
管理器提供了一种创建数据的方法,这些数据可以在不同的进程之间共享,
包括在运行在不同机器上的进程之间通过网络共享。管理器对象控制管理共享对象的服务器进程。
其他进程可以使用代理访问共享对象。
class multiprocessing.Manager()
返回已启动的SyncManager对象,该对象可用于在进程之间共享对象
class multiprocessing.managers.BaseManager([address[, authkey]])
start()
get_server()
connect()
shutdown()
register()
address
authkey
进程同步管理器:通过manager返回这种类型对象
class multiprocessing.managers.SyncManager() BaseManager的子类
Barrier()
BoundedSemaphore()
Condition()
Event()
Lock()
Namespace()
Queue()
RLock()
Semaphore()
Array()
Value()
dict()
dict(mapping)
dict(sequence)
list()
list(sequence)
定制管理器 :Customized managers
要创建自己的管理器,可以创建BaseManager的一个子类,并使用register()类方法向Manager类注册新类型或调用
使用远程管理: Using a remote manager
可以在一台计算机上运行管理服务器,并让客户端从其他计算机使用管理服务器。
代理对象:Proxy Objects
进程池 :Process Pools
class multiprocessing.pool.Pool(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
一个进程池对象,它控制可以向其提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行映射实现
如果processes 是None 那么会使用os.cpu_count()返回数来作为进程数
apply(func, args=(), kwds={}) #会阻塞,直到结果准备好为止
apply_async(func, args=()) # 异步非阻塞 返回有序结果对象
map(func, iterable,) #阻塞,直到全部返回结果,可以并行迭代,参数为list.返回结果是一个list
map_async(func, iterable,) # map()的异步版本 返回结果对象
imap(func, iterable,chunksize=1) # 等价map()但,比map()慢得多, 返回的结果有序
imap_unordered(func, iterable,chunksize=1) # 返回结果是无序的
starmap(func, iterable) #与map_async类似,阻塞返回结果,可迭代的参数
starmap_async(func, iterable)# map_async非阻塞,返回结果对象
close() # 关闭进程池,不再接受新的进程
join() # 主进程阻塞等待子进程的退出
terminate()
class multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()返回的结果的类。
调用get()方法获取返回的值
监听器和客户端:Listeners and Clients
通常,进程之间的消息传递是使用queues或使用Pipe()返回的连接对象来完成的。
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
上述方法会造成死锁,应该将最后两行进行调换位置即可
进程池的使用:pool
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Creating pool with 4 processes
Ordered results using pool.apply_async():
SpawnPoolWorker-1 says that mul(0, 7) = 0
SpawnPoolWorker-2 says that mul(1, 7) = 7
SpawnPoolWorker-2 says that mul(2, 7) = 14
SpawnPoolWorker-3 says that mul(3, 7) = 21
SpawnPoolWorker-4 says that mul(4, 7) = 28
SpawnPoolWorker-2 says that plus(0, 8) = 8
SpawnPoolWorker-1 says that plus(1, 8) = 9
SpawnPoolWorker-4 says that plus(2, 8) = 10
SpawnPoolWorker-3 says that plus(3, 8) = 11
SpawnPoolWorker-4 says that plus(4, 8) = 12
Ordered results using pool.imap():
SpawnPoolWorker-1 says that mul(0, 7) = 0
SpawnPoolWorker-2 says that mul(1, 7) = 7
SpawnPoolWorker-2 says that mul(2, 7) = 14
SpawnPoolWorker-2 says that mul(3, 7) = 21
SpawnPoolWorker-3 says that mul(4, 7) = 28
SpawnPoolWorker-4 says that plus(0, 8) = 8
SpawnPoolWorker-1 says that plus(1, 8) = 9
SpawnPoolWorker-2 says that plus(2, 8) = 10
SpawnPoolWorker-1 says that plus(3, 8) = 11
SpawnPoolWorker-2 says that plus(4, 8) = 12
Unordered results using pool.imap_unordered():
SpawnPoolWorker-3 says that mul(0, 7) = 0
SpawnPoolWorker-3 says that mul(3, 7) = 21
SpawnPoolWorker-4 says that mul(1, 7) = 7
SpawnPoolWorker-3 says that mul(4, 7) = 28
SpawnPoolWorker-1 says that mul(2, 7) = 14
SpawnPoolWorker-2 says that plus(0, 8) = 8
SpawnPoolWorker-3 says that plus(2, 8) = 10
SpawnPoolWorker-4 says that plus(1, 8) = 9
SpawnPoolWorker-1 says that plus(3, 8) = 11
SpawnPoolWorker-2 says that plus(4, 8) = 12
Ordered results using pool.map() --- will block till complete:
SpawnPoolWorker-3 says that mul(0, 7) = 0
SpawnPoolWorker-4 says that mul(1, 7) = 7
SpawnPoolWorker-1 says that mul(2, 7) = 14
SpawnPoolWorker-2 says that mul(3, 7) = 21
SpawnPoolWorker-4 says that mul(4, 7) = 28
SpawnPoolWorker-2 says that plus(0, 8) = 8
SpawnPoolWorker-4 says that plus(1, 8) = 9
SpawnPoolWorker-1 says that plus(2, 8) = 10
SpawnPoolWorker-2 says that plus(3, 8) = 11
SpawnPoolWorker-3 says that plus(4, 8) = 12
Testing error handling:
Got ZeroDivisionError as expected from pool.apply()
Got ZeroDivisionError as expected from pool.map()
Got ZeroDivisionError as expected from list(pool.imap())
Got ZeroDivisionError as expected from IMapIterator.next()
Testing ApplyResult.get() with timeout: ....
SpawnPoolWorker-2 says that mul(0, 7) = 0
Testing IMapIterator.next() with timeout: ...
SpawnPoolWorker-1 says that mul(0, 7) = 0..........
SpawnPoolWorker-3 says that mul(1, 7) = 7
SpawnPoolWorker-4 says that mul(2, 7) = 14
SpawnPoolWorker-2 says that mul(3, 7) = 21
SpawnPoolWorker-1 says that mul(4, 7) = 28
SpawnPoolWorker-2 says that plus(0, 8) = 8...
SpawnPoolWorker-1 says that plus(1, 8) = 9
SpawnPoolWorker-2 says that plus(2, 8) = 10..........
SpawnPoolWorker-4 says that plus(3, 8) = 11
SpawnPoolWorker-3 says that plus(4, 8) = 12
#可以看出pool中的方法:
pool.apply_async() 返回结果有序,根据传入的args的顺序
pool.imap() 返回结果有序,根据传入的iterable(序列)的顺序
pool.imap_unordered()返回结果无序
pool.map() 返回结果有序,但是该方法是阻塞的,必须等到所有任务执行完后才能获取结果
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'): #一直调用,获取哨兵,直到取出的数据为STOP时停止
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(5)]
TASKS2 = [(plus, (i, 8)) for i in range(5)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP') #添加哨兵,让子进程结束,有几个子进程 就放入几个STOP
if __name__ == '__main__':
freeze_support()
test()
Unordered results:
Process-2 says that mul(1, 7) = 7
Process-1 says that mul(0, 7) = 0
Process-4 says that mul(4, 7) = 28
Process-2 says that mul(3, 7) = 21
Process-3 says that mul(2, 7) = 14
Process-3 says that plus(3, 8) = 11
Process-1 says that plus(0, 8) = 8
Process-3 says that plus(4, 8) = 12
Process-2 says that plus(2, 8) = 10
Process-4 says that plus(1, 8) = 9