为了快速开发,使用了python。发现其性能确实令人堪忧,虽然使用了python的多线程,但python的多线程有一个全局锁,其实并未真正的充分利用系统的硬件资源(不能充分利用多核)。又不想颠覆性的开发,经同事建议,就开始学习multiprocessing包。
multiprocessing包实际上就是个进程管理的包。可以方便的管理多个进程。经过几天的学习,只改动了几行的代码,就使得原有的程序变成了“多进程/多线程”相结合的程序了。可悲的事,经测试下来,性能也并未得到大幅的提高。(这可能是由于连接redis、连接数据库等问题导致的,整体测试,大概30秒,可读取redis队列中的20万条以上的数据,大致6666条/s)
其实,之前写“多进程/多线程”的程序并不多。更少有进程间通信。而这次通过对代码的优化。对进程间通信的三种方式“信号量”、“管道”、“共享内存”中的后两种,有了较为深刻的理解。“共享内存”这里就不讲了(最简单的就是共享一个全局变量,我想有点经验的都懂得)。主要说明一下利用“管道”,实施的进程间通信的。当然,也是利用multiprocessing包。直接上带码吧。看着代码,更直观。
代码:
from multiprocessing import Process,Queue
import time,os,random,signal
def child_function(my_queue):
mypid = os.getpid()
print "mypid-%d run...\n" % (mypid)
seconds = random.randint(5,15)
time.sleep(seconds);
print "spid:%s exit\n" % mypid
my_queue.put(mypid)
if __name__ == '__main__':
process_num = 10
process_recond = {}
exist_proce_num = len(process_recond)
q = Queue()
#signal.signal(signal.SIGCHLD,signal.SIG_IGN)
flag = 1
while(flag<5):
print "come into while,and get message\n"
while q.empty()!=True:
message = q.get()
print "check_message:%s" % message
sub_process = process_recond[message]
if sub_process.is_alive() == True:
sub_process.terminate()
del process_recond[message]
exist_proce_num = len(process_recond)
print "run: %d, process_num: %d\n" % (flag,exist_proce_num)
if(exist_proce_num < 10):
star_process_num = 10-exist_proce_num
for i in range(star_process_num):
p = Process(target=child_function,args=(q,))
p.start()
key = p.pid
process_recond[key] = p
print "new process was created,spid: %s " % key
flag = flag+1
ltime = time.localtime(time.time())
timeStr = time.strftime("%Y-%m-%d %H:%M:%S", ltime)
print "%s parent process-pid: %d ,run is %d times\n" % (timeStr,os.getpid(),flag)
time.sleep(20)
q.close()
print "main process %d over\n" % os.getpid()
代码中,主进程关键是利用了字典“process_recond”记录已经创建的进程号和进程句柄的对应关系。并通过对 “process_recond”的监控和管理,达到对子进程的管理。
key = p.pid
process_recond[key] = p
需要注意的是,为了使得主进程能够循环执行,且不死循环,利用了flag变量来决定循环次数。一般应该是在循环时检查配置文件,通过读取配置文件的形式决定程序运行态。
代码中,子进程执行的是“child_function”函数。在该函数中,是实际执行你想并行执行的程序代码(这里仅写了测试代码)。而关键的是,当此程序出现问题时,需要告诉主进程“我有问题了,请将我关闭,重新创建新的进程”。
主进程面临两个问题:1、如何得知此消息;2、如何确定是哪个进程出了问题。
第1个问题,是本次学习multiprocessing包的关键。它提供了一个Queue(虽然是队列,实际就是“管道”的作用)的类。它为主进程与子进程的通信建立起了桥梁。
主进程,首先实例化了一个Queue的对象q,创建子进程时,将该对象传入子进程。这样子进程就能通过该队列与主进程进行通信。
q = Queue()
p = Process(target=child_function,args=(q,))
主进程,通过对队列q(管道)的监听和操作,来获取来自子进程的消息。例如:
q.empty() //查看队列是否为空;
message = q.get() //如果消息不为空,可通过此操作,获取队列内的消息。
子进程,同样通过该队列对象q,向主进程发送消息。
my_queue.put(mypid) //子进程传入的队列q,改名为my_queue,通过put,将自己的进程号告知主进程。
第2个问题,在解决了第一个问题后,就非常容易解决。每个进程在创建后,都会有一个唯一标示,即进程号,pid。子进程,只要把自己的进程号告知主进程,主进程就知道了,哪个子进程报告自己有问题,需要关闭重启了。
以上,就是利用multiprocessing包中的Queue,实现了进程间的通信,它基本就是利用“管道”通信的原理。从而实现了多进程的管理。
可能有人会说,这不就是个进程池么。始终保持设定的进程数。为何搞得如此麻烦。
说的很对。本人开头已说,对多进程编程不熟悉,理解不深。通过上面的代码编写,主要是一个学习的过程。但是进程池,个人认为仅是维护进程个数,监控子进程,始终保持进程数。但并没有进程间通信的概念了。这样,就无法达到进程间的协调与同步。所以,还是有很大区别的。
不过,另一个学习心得也引出来了。multiprocessing包本身就有进程池Pool的概念。它有现成的机制可以利用。