python multiprocessing

游安康
2023-12-01

官方文档:https://docs.python.org/3.7/library/multiprocessing.html#using-a-pool-of-workers

# multiprocessing — Process-based parallelism 基于进程的并行性
# introduction multiprocessing多处理
# multiprocessing 是一个包 支持使用与threading模块相似的API来生成进程
# multiprocessing 提供本地和远程并发,通过使用子进程而不是线程有效地绕过全局解释器锁
# 因此,多处理模块允许程序员充分利用给定机器上的多个处理器。它运行在Unix和Windows上
#multiprocessing包中
# 进程池 多进程 并行执行同一任务 例子:
from multiprocessing import Pool 

def f1(x):
    return x*x

def main1():
    with Pool(5) as p:
        print(p.map(f,[1,2,3,4,5,6,7,8,9,10]))
# multiprocessing 支持三种方式 启动进程
# spawn:  父进程启动一个新的python解释器进程.子进程只继承运行 Run()方法所需的资源。
#         来自父进程的不必要的文件描述符和句柄将不会被继承,运行速度比较慢
#         unix and windows
# fork:  父进程使用os.fork()方法对Python解释器进行fork。子进程开始时实际上与父进程相同。
#         父进程的所有资源都由子进程继承。请注意,安全的fork多线程的进程是有问题的。
#         unix
# forkserver: 当程序启动并选择forkserver的启动方法时,将启动服务器进程。
#             从那时起,每当需要一个新进程时,父进程就会连接到服务器,并请求它fork一个新进程。
#             fork server进程是单线程的,所以使用os.fork()是安全的。没有任何不必要的资源被继承
#             unix

# 使用方法:
import multiprocessing as mp
def foo(q):
    q.put("hello")

def main2():
    mp.set_start_method("spawn") # 这个方法只可以使用一次
    q = mp.Queue()
    p = mp.Process(target = foo,args=(q,))
    p.start()
    print(q.get())
    p.join()
# 进程间通信:数据传递
# multiprocessing支持多进程之间的两种通信通道
# Queue/队列:与queue.Queue相似
#         Queue 是 线程和进程安全的
# Pipe/管道:Pipe() 函数返回由管道连接的一对连接对象,默认为双工(双向)。
from multiprocessing import Process, Queue

def f2(q):
    q.put([42, None, 'hello'])

def main3():
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

from multiprocessing import Process, Pipe

def f3(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() # 返回管道的双端,每端都有send() recv()方法
    p = Process(target=f3, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
进程间同步 Synchronization between processes
与线程同步原语相同
from multiprocessing import Process, Lock
lock = Lock()
lock.acquire()
lock.release()
进程间共享状态:
在进行并发编程时,通常最好尽量避免使用共享状态。在使用多个进程时尤其如此。
如果确实需要使用一些共享数据,multiprocessing提供了几种实现共享数据的方法
1 共享内存:
    将数据存储在共享内存映射中,使用Value(), Array()
    为了更灵活地使用共享内存,可以使用Multiprocessing.sharedctype模块,它支持创建从共享内存中分配的任意ctype对象。
2 Server process Manager:
    通过Manager()返回的manager对象控制一个服务器进程管理器,该进程包含python对象,并允许其他进程使用并对他们进行操作
    Manager() 支持这些类型 :list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array

    服务器进程管理器比使用共享内存对象更灵活,因为它们可以用来支持任意对象类型。另外,一个管理器可以由不同计算机上的进程通过网络共享。但是,它们比使用共享内存慢。
workers pool:工作池
with Pool(processes=4) as pool: 开启4个工作进程
    pool.map(f, range(10)) # 有序输出

    for i in pool.imap_unordered(f, range(10)):
        print(i)    # 无序输出

    res = pool.apply_async(f, (20,)) # 异步计算
    res.get(timeout=1) #获取计算结果

或者:
pool = Pool(processes=4)
pool.map(f, range(10))
pool.close()
pool.join()
Process 和 exceptions
Process()对象有着threading.Thread相同的方法
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
如果子类重写构造函数的话,一定要调用父类的初始化Process.__init__())
    run()
    start() # 调用run()方法执行 target= 函数
    join()
    name()
    is_alive()
    daemon()
    除了线程的方法外,还支持以下方法
    pid() # 返回进程的pid
    exitcode() #进程退出码,
    authkey()   #进程的身份验证密钥(字节字符串)。
    sentinel()
    terminate() #进程终止
    kill()  #终止进程,使用sigkill信号
    close() #关闭对象 释放资源
Pipes and Queues
当使用多个进程时,通常使用消息传递来实现进程之间的通信,而不必使用任何同步原语(如锁)。
对于传递消息,可以使用Pipe()(用于两个进程之间的连接)或queue(允许多个生产者和消费者)。
The Queue, SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the queue.Queue class in the standard library.

class multiprocessing.Pipe(duplex)
    returns a pair (conn1, conn2)
    duplex = True 双向管道 False 单向管道 

# 进程的共享队列
class multiprocessing.Queue([maxsize])
    qsize()
    empty()
    full()
    put()
    put_nowait()
    get()
    get_nowait()

class multiprocessing.SimpleQueue
    #它是一个简化的队列类型,非常接近一个锁定管道。
    empty()
    get()
    put()

class multiprocessing.JoinableQueue([maxsize])
    #JoinableQueue是一个Queue子类,它是一个队列,它还具有TaskDone()和Join()方法。
    task_done()
    join()
连接对象:Connection Objects
连接对象通常使用Pipe()创建
class multiprocessing.connection.Connection
    send(obj)
    recv()
    poll()
    send_bytes(b'thank you')
    recv_bytes()
    recv_bytes_into()
同步原语 Synchronization
通常,同步原语在多进程程序中并不像在多线程程序中那样必要
class multiprocessing.Barrier(parties[, action[, timeout]])
class multiprocessing.BoundedSemaphore([value])
class multiprocessing.Condition([lock])
class multiprocessing.Event()
class multiprocessing.Lock()  #非递归锁
class multiprocessing.RLock() #递归锁
    acquire()
    release()
class multiprocessing.Semaphore([value])
Shared ctypes Objects 共享ctype对象
可以使用可由子进程继承的共享内存创建共享对象。
class multiprocessing.Value(typecode_or_type, *args, lock=True)
class multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
The multiprocessing.sharedctypes module

class multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)
class multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)
class multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)
class multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)
class multiprocessing.sharedctypes.copy(obj)
class multiprocessing.sharedctypes.synchronized(obj[, lock])
Managers:管理器
管理器提供了一种创建数据的方法,这些数据可以在不同的进程之间共享,
包括在运行在不同机器上的进程之间通过网络共享。管理器对象控制管理共享对象的服务器进程。
其他进程可以使用代理访问共享对象。
class multiprocessing.Manager()
    返回已启动的SyncManager对象,该对象可用于在进程之间共享对象
class multiprocessing.managers.BaseManager([address[, authkey]])
    start()
    get_server()
    connect()
    shutdown()
    register()
    address
    authkey

进程同步管理器:通过manager返回这种类型对象
class multiprocessing.managers.SyncManager() BaseManager的子类
    Barrier()
    BoundedSemaphore()
    Condition()
    Event()
    Lock()
    Namespace()
    Queue()
    RLock()
    Semaphore()
    Array()
    Value()
    dict()
    dict(mapping)
    dict(sequence)
    list()
    list(sequence)  

定制管理器 :Customized managers
要创建自己的管理器,可以创建BaseManager的一个子类,并使用register()类方法向Manager类注册新类型或调用

使用远程管理: Using a remote manager
可以在一台计算机上运行管理服务器,并让客户端从其他计算机使用管理服务器。
代理对象:Proxy Objects
进程池 :Process Pools 
class multiprocessing.pool.Pool(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
一个进程池对象,它控制可以向其提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行映射实现
如果processes 是None 那么会使用os.cpu_count()返回数来作为进程数
    apply(func, args=(), kwds={})  #会阻塞,直到结果准备好为止
    apply_async(func, args=())   # 异步非阻塞  返回有序结果对象
    map(func, iterable,) #阻塞,直到全部返回结果,可以并行迭代,参数为list.返回结果是一个list
    map_async(func, iterable,) # map()的异步版本 返回结果对象
    imap(func, iterable,chunksize=1)  # 等价map()但,比map()慢得多, 返回的结果有序
    imap_unordered(func, iterable,chunksize=1) # 返回结果是无序的
    starmap(func, iterable)    #与map_async类似,阻塞返回结果,可迭代的参数
    starmap_async(func, iterable)# map_async非阻塞,返回结果对象
    close() # 关闭进程池,不再接受新的进程
    join()  # 主进程阻塞等待子进程的退出
    terminate()
class multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()返回的结果的类。 
调用get()方法获取返回的值
监听器和客户端:Listeners and Clients
通常,进程之间的消息传递是使用queues或使用Pipe()返回的连接对象来完成的。

Example

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()
上述方法会造成死锁,应该将最后两行进行调换位置即可
进程池的使用:pool
import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()
Creating pool with 4 processes

Ordered results using pool.apply_async():
     SpawnPoolWorker-1 says that mul(0, 7) = 0
     SpawnPoolWorker-2 says that mul(1, 7) = 7
     SpawnPoolWorker-2 says that mul(2, 7) = 14
     SpawnPoolWorker-3 says that mul(3, 7) = 21
     SpawnPoolWorker-4 says that mul(4, 7) = 28
     SpawnPoolWorker-2 says that plus(0, 8) = 8
     SpawnPoolWorker-1 says that plus(1, 8) = 9
     SpawnPoolWorker-4 says that plus(2, 8) = 10
     SpawnPoolWorker-3 says that plus(3, 8) = 11
     SpawnPoolWorker-4 says that plus(4, 8) = 12

Ordered results using pool.imap():
     SpawnPoolWorker-1 says that mul(0, 7) = 0
     SpawnPoolWorker-2 says that mul(1, 7) = 7
     SpawnPoolWorker-2 says that mul(2, 7) = 14
     SpawnPoolWorker-2 says that mul(3, 7) = 21
     SpawnPoolWorker-3 says that mul(4, 7) = 28
     SpawnPoolWorker-4 says that plus(0, 8) = 8
     SpawnPoolWorker-1 says that plus(1, 8) = 9
     SpawnPoolWorker-2 says that plus(2, 8) = 10
     SpawnPoolWorker-1 says that plus(3, 8) = 11
     SpawnPoolWorker-2 says that plus(4, 8) = 12

Unordered results using pool.imap_unordered():
     SpawnPoolWorker-3 says that mul(0, 7) = 0
     SpawnPoolWorker-3 says that mul(3, 7) = 21
     SpawnPoolWorker-4 says that mul(1, 7) = 7
     SpawnPoolWorker-3 says that mul(4, 7) = 28
     SpawnPoolWorker-1 says that mul(2, 7) = 14
     SpawnPoolWorker-2 says that plus(0, 8) = 8
     SpawnPoolWorker-3 says that plus(2, 8) = 10
     SpawnPoolWorker-4 says that plus(1, 8) = 9
     SpawnPoolWorker-1 says that plus(3, 8) = 11
     SpawnPoolWorker-2 says that plus(4, 8) = 12

Ordered results using pool.map() --- will block till complete:
     SpawnPoolWorker-3 says that mul(0, 7) = 0
     SpawnPoolWorker-4 says that mul(1, 7) = 7
     SpawnPoolWorker-1 says that mul(2, 7) = 14
     SpawnPoolWorker-2 says that mul(3, 7) = 21
     SpawnPoolWorker-4 says that mul(4, 7) = 28
     SpawnPoolWorker-2 says that plus(0, 8) = 8
     SpawnPoolWorker-4 says that plus(1, 8) = 9
     SpawnPoolWorker-1 says that plus(2, 8) = 10
     SpawnPoolWorker-2 says that plus(3, 8) = 11
     SpawnPoolWorker-3 says that plus(4, 8) = 12

Testing error handling:
    Got ZeroDivisionError as expected from pool.apply()
    Got ZeroDivisionError as expected from pool.map()
    Got ZeroDivisionError as expected from list(pool.imap())
    Got ZeroDivisionError as expected from IMapIterator.next()

Testing ApplyResult.get() with timeout: ....
    SpawnPoolWorker-2 says that mul(0, 7) = 0

Testing IMapIterator.next() with timeout: ...
    SpawnPoolWorker-1 says that mul(0, 7) = 0..........
    SpawnPoolWorker-3 says that mul(1, 7) = 7
    SpawnPoolWorker-4 says that mul(2, 7) = 14
    SpawnPoolWorker-2 says that mul(3, 7) = 21
    SpawnPoolWorker-1 says that mul(4, 7) = 28
    SpawnPoolWorker-2 says that plus(0, 8) = 8...
    SpawnPoolWorker-1 says that plus(1, 8) = 9
    SpawnPoolWorker-2 says that plus(2, 8) = 10..........
    SpawnPoolWorker-4 says that plus(3, 8) = 11
    SpawnPoolWorker-3 says that plus(4, 8) = 12


#可以看出pool中的方法:
pool.apply_async()  返回结果有序,根据传入的args的顺序
pool.imap()         返回结果有序,根据传入的iterable(序列)的顺序
pool.imap_unordered()返回结果无序
pool.map()  返回结果有序,但是该方法是阻塞的,必须等到所有任务执行完后才能获取结果

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):  #一直调用,获取哨兵,直到取出的数据为STOP时停止
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(5)]
    TASKS2 = [(plus, (i, 8)) for i in range(5)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')  #添加哨兵,让子进程结束,有几个子进程 就放入几个STOP


if __name__ == '__main__':
    freeze_support()
    test()
Unordered results:
     Process-2 says that mul(1, 7) = 7
     Process-1 says that mul(0, 7) = 0
     Process-4 says that mul(4, 7) = 28
     Process-2 says that mul(3, 7) = 21
     Process-3 says that mul(2, 7) = 14
     Process-3 says that plus(3, 8) = 11
     Process-1 says that plus(0, 8) = 8
     Process-3 says that plus(4, 8) = 12
     Process-2 says that plus(2, 8) = 10
     Process-4 says that plus(1, 8) = 9
 类似资料:

相关阅读

相关文章

相关问答