Python——multiprocessing包的学习心得(进程间通信)

隆飞宇
2023-12-01

    为了快速开发,使用了python。发现其性能确实令人堪忧,虽然使用了python的多线程,但python的多线程有一个全局锁,其实并未真正的充分利用系统的硬件资源(不能充分利用多核)。又不想颠覆性的开发,经同事建议,就开始学习multiprocessing包。

    multiprocessing包实际上就是个进程管理的包。可以方便的管理多个进程。经过几天的学习,只改动了几行的代码,就使得原有的程序变成了“多进程/多线程”相结合的程序了。可悲的事,经测试下来,性能也并未得到大幅的提高。(这可能是由于连接redis、连接数据库等问题导致的,整体测试,大概30秒,可读取redis队列中的20万条以上的数据,大致6666条/s)

      其实,之前写“多进程/多线程”的程序并不多。更少有进程间通信。而这次通过对代码的优化。对进程间通信的三种方式“信号量”、“管道”、“共享内存”中的后两种,有了较为深刻的理解。“共享内存”这里就不讲了(最简单的就是共享一个全局变量,我想有点经验的都懂得)。主要说明一下利用“管道”,实施的进程间通信的。当然,也是利用multiprocessing包。直接上带码吧。看着代码,更直观。

代码:

from multiprocessing import Process,Queue
import time,os,random,signal

def child_function(my_queue):
    mypid = os.getpid()
    print "mypid-%d run...\n" % (mypid)
    seconds = random.randint(5,15)
    time.sleep(seconds);
    print "spid:%s exit\n" % mypid
    my_queue.put(mypid)
    

if __name__ == '__main__':
    
    process_num = 10
    process_recond = {}
    exist_proce_num = len(process_recond)
    q = Queue()
    
    #signal.signal(signal.SIGCHLD,signal.SIG_IGN)

    flag = 1
    while(flag<5):
        print "come into while,and get message\n"
        while q.empty()!=True:
            message = q.get()
            print "check_message:%s" % message
            sub_process = process_recond[message]
            if sub_process.is_alive() == True:
                sub_process.terminate()
            del process_recond[message]
            
        exist_proce_num = len(process_recond)
        print "run: %d, process_num: %d\n" % (flag,exist_proce_num)
        if(exist_proce_num < 10):
            star_process_num = 10-exist_proce_num
            for i in range(star_process_num):
                p = Process(target=child_function,args=(q,))
                p.start()
                key = p.pid
                process_recond[key] = p
                print "new process was created,spid: %s " % key      
        
        flag = flag+1
        ltime = time.localtime(time.time())
        timeStr = time.strftime("%Y-%m-%d %H:%M:%S", ltime)
        print "%s parent process-pid: %d ,run is %d times\n" % (timeStr,os.getpid(),flag)        
        time.sleep(20)
    
    q.close()
    print "main process %d over\n" % os.getpid()

     上面的代码,主要目的是想:利用主进程(守护进程)创建多个子进程实现并行处理,而最关键的是,主进程能够监控子进程。如果某个子进程出现了问题。子进程能够通知主进程,而主进程则杀掉该子进程,并重新创建新的进程。始终保持程序拥有设定好的进程数,执行任务。

       代码中,主进程关键是利用了字典“process_recond”记录已经创建的进程号和进程句柄的对应关系。并通过对 “process_recond”的监控和管理,达到对子进程的管理。

    key = p.pid
    process_recond[key] = p

      需要注意的是,为了使得主进程能够循环执行,且不死循环,利用了flag变量来决定循环次数。一般应该是在循环时检查配置文件,通过读取配置文件的形式决定程序运行态。

      代码中,子进程执行的是“child_function”函数。在该函数中,是实际执行你想并行执行的程序代码(这里仅写了测试代码)。而关键的是,当此程序出现问题时,需要告诉主进程“我有问题了,请将我关闭,重新创建新的进程”。

      主进程面临两个问题:1、如何得知此消息;2、如何确定是哪个进程出了问题。

      第1个问题,是本次学习multiprocessing包的关键。它提供了一个Queue(虽然是队列,实际就是“管道”的作用)的类。它为主进程与子进程的通信建立起了桥梁。

       主进程,首先实例化了一个Queue的对象q,创建子进程时,将该对象传入子进程。这样子进程就能通过该队列与主进程进行通信。

    q = Queue()
    p = Process(target=child_function,args=(q,))

        主进程,通过对队列q(管道)的监听和操作,来获取来自子进程的消息。例如:

    q.empty()     //查看队列是否为空;

    message = q.get()   //如果消息不为空,可通过此操作,获取队列内的消息。 

       
        子进程,同样通过该队列对象q,向主进程发送消息。

     my_queue.put(mypid)  //子进程传入的队列q,改名为my_queue,通过put,将自己的进程号告知主进程。         

        第2个问题,在解决了第一个问题后,就非常容易解决。每个进程在创建后,都会有一个唯一标示,即进程号,pid。子进程,只要把自己的进程号告知主进程,主进程就知道了,哪个子进程报告自己有问题,需要关闭重启了。

          以上,就是利用multiprocessing包中的Queue,实现了进程间的通信,它基本就是利用“管道”通信的原理。从而实现了多进程的管理。

          可能有人会说,这不就是个进程池么。始终保持设定的进程数。为何搞得如此麻烦。

          说的很对。本人开头已说,对多进程编程不熟悉,理解不深。通过上面的代码编写,主要是一个学习的过程。但是进程池,个人认为仅是维护进程个数,监控子进程,始终保持进程数。但并没有进程间通信的概念了。这样,就无法达到进程间的协调与同步。所以,还是有很大区别的。

           不过,另一个学习心得也引出来了。multiprocessing包本身就有进程池Pool的概念。它有现成的机制可以利用。



 类似资料: