queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。
1)Queue & JoinableQueue
queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。
multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法。
task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
join() 阻塞直到queue中的所有的task都被处理(即task_done方法被html" target="_blank">调用)。
代码:
import multiprocessing import timeclass Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue
def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print ('%s: Exiting' % proc_name) self.task_queue.task_done() break print ('%s: %s' % (proc_name, next_task)) answer = next_task() # __call__() self.task_queue.task_done() self.result_queue.put(answer) return
class Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take some time to do the work return '%s * %s = %s' % (self.a, self.b, self.a * self.b) def __str__(self): return '%s * %s' % (self.a, self.b)
if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() print ('Creating %d consumers' % num_consumers) consumers = [ Consumer(tasks, results) for i in range(num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in range(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in range(num_consumers): tasks.put(None)
# Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print ('Result:', result) num_jobs -= 1
注意小技巧: 使用None来表示task处理完毕。
运行结果:
2)pipe
pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。
代码:
from multiprocessing import Process, Pipedef f(conn): conn.send([42, None, 'hello']) conn.close()
if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() p.join() print(parent_conn.recv()) # prints "[42, None, 'hello']"
3)Value + Array
Value + Array 是python中共享内存 映射文件的方法,速度比较快。
from multiprocessing import Process, Value, Arraydef f(n, a): n.value = n.value + 1 for i in range(len(a)): a[i] = a[i] * 10
if __name__ == '__main__': num = Value('i', 1) arr = Array('i', range(10))
p = Process(target=f, args=(num, arr)) p.start() p.join()
print(num.value) print(arr[:]) p2 = Process(target=f, args=(num, arr)) p2.start() p2.join()
print(num.value) print(arr[:])
# the output is : # 2 # [0, 10, 20, 30, 40, 50, 60, 70, 80, 90] # 3 # [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]
问题内容: 我试图从通话中抢夺,尽管我可以通过以下方式轻松实现此目标: 我想“实时”抓取。使用上述方法,PIPE等待获取所有内容,然后返回。 因此,出于日志记录目的,这不符合我的要求(例如,“查看”发生的情况)。 在运行时,有没有办法逐行获取?或者这是(必须等到关闭)的限制。 编辑 如果我切换为我只得到(不理想)的最后一行: 问题答案: 您的解释器正在缓冲。在打印语句后,添加对sys.stdout
查询数组中元素,返回下标。 参数 名称 类型 默认值 描述 array [] 数组。 value * 寻找的对象。 返回值 下标,类型:number。
本文向大家介绍Android通过继承Binder类实现多进程通信,包括了Android通过继承Binder类实现多进程通信的使用技巧和注意事项,需要的朋友参考一下 AIDL的底层是通过Binder进行通信的,通过追踪.aidl编译后自动生成的文件我们知道,文件中的Stub类用于服务端,Proxy类用于客户端调用,那么可否直接通过继承Binder类实现多进程通信呢?下面就来试一试。 效果图: 服务端
本文向大家介绍python基于queue和threading实现多线程下载实例,包括了python基于queue和threading实现多线程下载实例的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python基于queue和threading实现多线程下载的方法,分享给大家供大家参考。具体方法如下: 主代码如下: 其中downloadworkers.py 类继承 threading.Th
本文向大家介绍python执行子进程实现进程间通信的方法,包括了python执行子进程实现进程间通信的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了python执行子进程实现进程间通信的方法。分享给大家供大家参考。具体实现方法如下: a.py: b.py: 希望本文所述对大家的Python程序设计有所帮助。
本文向大家介绍Python通过队列来实现进程间通信的示例,包括了Python通过队列来实现进程间通信的示例的使用技巧和注意事项,需要的朋友参考一下 Python程序中,在进程和进程之间是不共享全局变量的数据的。 我们来看一个例子: 进程 p1 里对全局变量 nums 循环进行处理,进程 p2 将 nums 打印出来,发现 nums 的值没有变化。 运行结果: in process1 pid=578