本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:
mutilprocess简介
像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
简单的创建进程:
import multiprocessing def worker(num): """thread worker function""" print 'Worker:', num return if __name__ == '__main__': jobs = [] for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) jobs.append(p) p.start()
确定当前的进程,即是给进程命名,方便标识区分,跟踪
import multiprocessing import time def worker(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep(2) print name, 'Exiting' def my_service(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep(3) print name, 'Exiting' if __name__ == '__main__': service = multiprocessing.Process(name='my_service', target=my_service) worker_1 = multiprocessing.Process(name='worker 1', target=worker) worker_2 = multiprocessing.Process(target=worker) # default name worker_1.start() worker_2.start() service.start()
守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了
守护进程:
import multiprocessing import time import sys def daemon(): name = multiprocessing.current_process().name print 'Starting:', name time.sleep(2) print 'Exiting :', name def non_daemon(): name = multiprocessing.current_process().name print 'Starting:', name print 'Exiting :', name if __name__ == '__main__': d = multiprocessing.Process(name='daemon', target=daemon) d.daemon = True n = multiprocessing.Process(name='non-daemon', target=non_daemon) n.daemon = False d.start() n.start() d.join(1) print 'd.is_alive()', d.is_alive() n.join()
最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态
终止进程:
import multiprocessing import time def slow_worker(): print 'Starting worker' time.sleep(0.1) print 'Finished worker' if __name__ == '__main__': p = multiprocessing.Process(target=slow_worker) print 'BEFORE:', p, p.is_alive() p.start() print 'DURING:', p, p.is_alive() p.terminate() print 'TERMINATED:', p, p.is_alive() p.join() print 'JOINED:', p, p.is_alive()
①. == 0 未生成任何错误
②. 0 进程有一个错误,并以该错误码退出
③. < 0 进程由一个-1 * exitcode信号结束
进程的退出状态:
import multiprocessing import sys import time def exit_error(): sys.exit(1) def exit_ok(): return def return_value(): return 1 def raises(): raise RuntimeError('There was an error!') def terminated(): time.sleep(3) if __name__ == '__main__': jobs = [] for f in [exit_error, exit_ok, return_value, raises, terminated]: print 'Starting process for', f.func_name j = multiprocessing.Process(target=f, name=f.func_name) jobs.append(j) j.start() jobs[-1].terminate() for j in jobs: j.join() print '%15s.exitcode = %s' % (j.name, j.exitcode)
方便的调试,可以用logging
日志:
import multiprocessing import logging import sys def worker(): print 'Doing some work' sys.stdout.flush() if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker) p.start() p.join()
利用class来创建进程,定制子类
派生进程:
import multiprocessing class Worker(multiprocessing.Process): def run(self): print 'In %s' % self.name return if __name__ == '__main__': jobs = [] for i in range(5): p = Worker() jobs.append(p) p.start() for j in jobs: j.join()
python进程间传递消息:
import multiprocessing class MyFancyClass(object): def __init__(self, name): self.name = name def do_something(self): proc_name = multiprocessing.current_process().name print 'Doing something fancy in %s for %s!' % \ (proc_name, self.name) def worker(q): obj = q.get() obj.do_something() if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() queue.put(MyFancyClass('Fancy Dan')) # Wait for the worker to finish queue.close() queue.join_thread() p.join() import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print '%s: Exiting' % proc_name self.task_queue.task_done() break print '%s: %s' % (proc_name, next_task) answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take some time to do the work return '%s * %s = %s' % (self.a, self.b, self.a * self.b) def __str__(self): return '%s * %s' % (self.a, self.b) if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print 'Creating %d consumers' % num_consumers consumers = [ Consumer(tasks, results) for i in xrange(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in xrange(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in xrange(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print 'Result:', result num_jobs -= 1
Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
进程间信号传递:
import multiprocessing import time def wait_for_event(e): """Wait for the event to be set before doing anything""" print 'wait_for_event: starting' e.wait() print 'wait_for_event: e.is_set()->', e.is_set() def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print 'wait_for_event_timeout: starting' e.wait(t) print 'wait_for_event_timeout: e.is_set()->', e.is_set() if __name__ == '__main__': e = multiprocessing.Event() w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,)) w1.start() w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout, args=(e, 2)) w2.start() print 'main: waiting before calling Event.set()' time.sleep(3) e.set() print 'main: event is set'
Python多进程,一般的情况是Queue来传递。
Queue:
from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()
多线程优先队列Queue:
import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__(self, threadID, name, q): threading.Thread.__init__(self) self.threadID = threadID self.name = name self.q = q def run(self): print "Starting " + self.name process_data(self.name, self.q) print "Exiting " + self.name def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else: queueLock.release() time.sleep(1) threadList = ["Thread-1", "Thread-2", "Thread-3"] nameList = ["One", "Two", "Three", "Four", "Five"] queueLock = threading.Lock() workQueue = Queue.Queue(10) threads = [] threadID = 1 # Create new threads for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID += 1 # Fill the queue queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # Wait for queue to empty while not workQueue.empty(): pass # Notify threads it's time to exit exitFlag = 1 # Wait for all threads to complete for t in threads: t.join() print "Exiting Main Thread"
多进程使用Queue通信的例子
import time from multiprocessing import Process,Queue MSG_QUEUE = Queue(5) def startA(msgQueue): while True: if msgQueue.empty() > 0: print ('queue is empty %d' % (msgQueue.qsize())) else: msg = msgQueue.get() print( 'get msg %s' % (msg,)) time.sleep(1) def startB(msgQueue): while True: msgQueue.put('hello world') print( 'put hello world queue size is %d' % (msgQueue.qsize(),)) time.sleep(3) if __name__ == '__main__': processA = Process(target=startA,args=(MSG_QUEUE,)) processB = Process(target=startB,args=(MSG_QUEUE,)) processA.start() print( 'processA start..')
主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。
更多关于Python相关内容感兴趣的读者可查看本站专题:《Python进程与线程操作技巧总结》、《Python Socket编程技巧总结》、《Python数据结构与算法教程》、《Python函数使用技巧总结》、《Python字符串操作技巧汇总》、《Python入门与进阶经典教程》及《Python文件与目录操作技巧汇总》
希望本文所述对大家Python程序设计有所帮助。
本文向大家介绍Python多进程并发(multiprocessing)用法实例详解,包括了Python多进程并发(multiprocessing)用法实例详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python多进程并发(multiprocessing)用法。分享给大家供大家参考。具体分析如下: 由于Python设计的限制(我说的是咱们常用的CPython)。最多只能用满1个CPU
本文向大家介绍Python多进程编程multiprocessing代码实例,包括了Python多进程编程multiprocessing代码实例的使用技巧和注意事项,需要的朋友参考一下 在 多线程与多进程的比较 这一篇中记录了多进程编程的一种方式. 下面记录一下多进程编程的别一种方式,即使用multiprocessing编程 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持呐喊教
本文向大家介绍简单学习Python多进程Multiprocessing,包括了简单学习Python多进程Multiprocessing的使用技巧和注意事项,需要的朋友参考一下 1.1 什么是 Multiprocessing 多线程在同一时间只能处理一个任务。 可把任务平均分配给每个核,而每个核具有自己的运算空间。 1.2 添加进程 Process 与线程类似,如下所示,但是该程序直接运行无结果,因
本文向大家介绍Python守护进程用法实例分析,包括了Python守护进程用法实例分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python守护进程用法。分享给大家供大家参考。具体分析如下: 守护进程是可以一直运行而不阻塞主程序退出。要标志一个守护进程,可以将Process实例的daemon属性设置为True。代码如下: 由于主进程会在1秒后退出,守护进程的 print "Exit
本文向大家介绍python基于multiprocessing的多进程创建方法,包括了python基于multiprocessing的多进程创建方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python基于multiprocessing的多进程创建方法。分享给大家供大家参考。具体如下: 定义进程的另一种方法,继承Process类,并实现run方法: 希望本文所述对大家的Python程序
本文向大家介绍Python多进程编程技术实例分析,包括了Python多进程编程技术实例分析的使用技巧和注意事项,需要的朋友参考一下 本文以实例形式分析了Python多进程编程技术,有助于进一步Python程序设计技巧。分享给大家供大家参考。具体分析如下: 一般来说,由于Python的线程有些限制,例如多线程不能充分利用多核CPU等问题,因此在Python中我们更倾向使用多进程。但在做不阻塞的异步U