当前位置: 首页 > 编程笔记 >

详解python之多进程和进程池(Processing库)

尹臻
2023-03-14
本文向大家介绍详解python之多进程和进程池(Processing库),包括了详解python之多进程和进程池(Processing库)的使用技巧和注意事项,需要的朋友参考一下

环境:win7+python2.7

一直想学习多进程或多线程,但之前只是单纯看一点基础知识还有简单的介绍,无法理解怎么去应用,直到前段时间看了github的一个爬虫项目涉及到多进程,多线程相关内容,一边看一边百度相关知识点,现在把一些相关知识点和一些应用写下来做个记录.

首先说下什么是进程:进程是程序在计算机上的一次执行活动,当运行一个程序的时候,就启动了一个进程.而进程又分为系统进程和用户进程.只要是用于完成操作系统的各种功能的进程就是系统进程,它们就是处于运行状态下的操作系统本身;而所有由你启动的进程都是用户进程。进程是操作系统进行资源分配的单位。

直观点说,在任务管理器的用户名上标明system的是系统进程,标明administrator的是用户进程,另外net是网洛,lcacal service是本地服务,关于进程更加具体的信息可以百科,这里得省点力气,不然收不回了.

一.多进程的简单使用

如图,multiprocessing有多个函数,很多我也还没去了解,这里只讲我目前了解的.

进程创建:Process(target=主要运行的函数,name=自定义进程名称可不写,args=(参数))

方法:

  1. is_alive():判断进程是否存活
  2. join([timeout]):子进程结束再执行下一步,timeout为超时时间,有时进程遇到阻塞,为了程序能够运行下去而设置超时时间
  3. run():如果在创建Process对象的时候不指定target,那么就会默认执行Process的run方法
  4. start():启动进程,区分run()
  5. terminate():终止进程,关于终止进程没有这么简单,貌似用psutil包会更好,有机会以后了解更多再写下。

其中,Process以start()启动某个进程。

属性:

  1. authkey: 在文档中authkey()函数找到这么一句话:Set authorization key of process设置过程的授权密钥 ,目前没找到相关应用实例,这个密钥是怎么用的呢?文章不提
  2. daemon:父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置
  3. exitcode:进程在运行时为None、如果为–N,表示被信号N结束
  4. name:进程的名字,自定义
  5. pid:每个进程有唯一的PID编号。

1.Process(),start(),join()

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p2.start()
 p1.join()
 p2.join()
 b=time.time()
 print 'finish',b-a

这里一共开了两个进程,p1和p2,arg=(4,)中的4是fun1函数的参数,这里要用tulpe类型,如果两个参数或更多就是arg=(参数1,参数2...),之后用start()启动进程,我们设置等待p1和p2进程结束再执行下一步.来看下面的运行结果,fun2和fun1基本在同一时间开始运行,当运行完毕(fun1睡眠4秒,同时fun2睡眠6秒),才执行print 'finish',b-a语句

this is fun2 Mon Jun 05 13:48:04 2017
this is fun1 Mon Jun 05 13:48:04 2017
fun1 finish Mon Jun 05 13:48:08 2017
fun2 finish Mon Jun 05 13:48:10 2017
finish 6.20300006866

Process finished with exit code 0

我们再来看下start()与join()处于不同位置会发生什么

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p1.join()
 p2.start()
 p2.join()
 b=time.time()
 print 'finish',b-a

结果:

this is fun1 Mon Jun 05 14:19:28 2017
fun1 finish Mon Jun 05 14:19:32 2017
this is fun2 Mon Jun 05 14:19:32 2017
fun2 finish Mon Jun 05 14:19:38 2017
finish 10.1229999065

Process finished with exit code 0

看,现在是先运行fun1函数,运行完毕再运行fun2接着再是print 'finish',即先运行进程p1再运行进程p2,感受到join()的魅力了吧.现在再试试注释掉join()看看又会出现什么

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a=time.time()
 p1=Process(target=fun1,args=(4,))
 p2 = Process(target=fun2, args=(6,))
 p1.start()
 p2.start()
 p1.join()
 #p2.join()
 b=time.time()
 print 'finish',b-a

结果:

this is fun1 Mon Jun 05 14:23:57 2017
this is fun2 Mon Jun 05 14:23:58 2017
fun1 finish Mon Jun 05 14:24:01 2017
finish 4.05900001526
fun2 finish Mon Jun 05 14:24:04 2017

Process finished with exit code 0

这次是运行完fun1(因为p1进程有用join(),所以主程序等待p1运行完接着执行下一步),接着继续运行主进程的print 'finish',最后fun2运行完毕才结束

2.name,daemon,is_alive():

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a=time.time()
 p1=Process(name='fun1进程',target=fun1,args=(4,))
 p2 = Process(name='fun2进程',target=fun2, args=(6,))
 p1.daemon=True
 p2.daemon = True
 p1.start()
 p2.start()
 p1.join()
 print p1,p2
 print '进程1:',p1.is_alive(),'进程2:',p2.is_alive()
 #p2.join()
 b=time.time()
 print 'finish',b-a

结果:

this is fun2 Mon Jun 05 14:43:49 2017
this is fun1 Mon Jun 05 14:43:49 2017
fun1 finish Mon Jun 05 14:43:53 2017
<Process(fun1进程, stopped daemon)> <Process(fun2进程, started daemon)>
进程1: False 进程2: True
finish 4.06500005722

Process finished with exit code 0

可以看到,name是给进程赋予名字, 运行到print '进程1:',p1.is_alive(),'进程2:',p2.is_alive() 这句的时候,p1进程已经结束(返回False),p2进程仍然在运行(返回True),但p2没有用join(),所以直接接着执行主进程,由于用了daemon=Ture,父进程终止后自动终止,p2进程没有结束就强行结束整个程序了.

3.run()

run()在Process没有指定target函数时,默认用run()函数运行程序,

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a = time.time()
 p=Process()
 p.start()
 p.join()
 b = time.time()
 print 'finish', b - a

结果:

finish 0.0840001106262

从结果看出,进程p什么也没做,为了让进程正常运行,我们酱紫写:

目标函数没有参数:

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1():
 print 'this is fun1',time.ctime()
 time.sleep(2)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a = time.time()
 p=Process()
 p.run=fun1
 p.start()
 p.join()
 b = time.time()
 print 'finish', b - a

结果:

this is fun1 Mon Jun 05 16:34:41 2017
fun1 finish Mon Jun 05 16:34:43 2017
finish 2.11500000954

Process finished with exit code 0

目标函数有参数:

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a = time.time()
 p=Process()
 p.run=fun1(2)
 p.start()
 p.join()
 b = time.time()
 print 'finish', b - a

结果:

this is fun1 Mon Jun 05 16:36:27 2017
fun1 finish Mon Jun 05 16:36:29 2017
Process Process-1:
Traceback (most recent call last):
 File "E:\Anaconda2\lib\multiprocessing\process.py", line 258, in _bootstrap
 self.run()
TypeError: 'NoneType' object is not callable
finish 2.0529999733

Process finished with exit code 0

目标函数有参数的出现了异常,为什么呢?我现在还找不到原因,但是实践发现,当最后一个参数赋予进程运行后,没有其他参数,就会出现这个异常,有人知道的望告知.

二.进程池

对于需要使用几个甚至十几个进程时,我们使用Process还是比较方便的,但是如果要成百上千个进程,用Process显然太笨了,multiprocessing提供了Pool类,即现在要讲的进程池,能够将众多进程放在一起,设置一个运行进程上限,每次只运行设置的进程数,等有进程结束,再添加新的进程

Pool(processes =num):设置运行进程数,当一个进程运行完,会添加新的进程进去

apply_async(函数,(参数)):非阻塞,其中参数是tulpe类型,

apply(函数,(参数)):阻塞

close():关闭pool,不能再添加新的任务

terminate():结束运行的进程,不再处理未完成的任务

join():和Process介绍的作用一样, 但要在close或terminate之后使用。

1.单个进程池

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for i in range(3,8):
  pool.apply_async(fun1,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print 'finish',b-a

结果:

this is fun1 Mon Jun 05 15:15:38 2017
this is fun1 Mon Jun 05 15:15:38 2017
this is fun1 Mon Jun 05 15:15:38 2017
fun1 finish Mon Jun 05 15:15:41 2017
this is fun1 Mon Jun 05 15:15:41 2017
fun1 finish Mon Jun 05 15:15:42 2017
this is fun1 Mon Jun 05 15:15:42 2017
fun1 finish Mon Jun 05 15:15:43 2017
fun1 finish Mon Jun 05 15:15:47 2017
fun1 finish Mon Jun 05 15:15:49 2017
finish 11.1370000839

Process finished with exit code 0

从上面的结果可以看到,设置了3个运行进程上限,15:15:38这个时间同时开始三个进程,当第一个进程结束时(参数为3秒那个进程),会添加新的进程,如此循环,直至进程池运行完再执行主进程语句b=time.time() print 'finish',b-a .这里用到非阻塞apply_async(),再来对比下阻塞apply()

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for i in range(3,8):
  pool.apply(fun1,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print 'finish',b-a

结果:

this is fun1 Mon Jun 05 15:59:26 2017
fun1 finish Mon Jun 05 15:59:29 2017
this is fun1 Mon Jun 05 15:59:29 2017
fun1 finish Mon Jun 05 15:59:33 2017
this is fun1 Mon Jun 05 15:59:33 2017
fun1 finish Mon Jun 05 15:59:38 2017
this is fun1 Mon Jun 05 15:59:38 2017
fun1 finish Mon Jun 05 15:59:44 2017
this is fun1 Mon Jun 05 15:59:44 2017
fun1 finish Mon Jun 05 15:59:51 2017
finish 25.1610000134

Process finished with exit code 0

可以看到,阻塞是当一个进程结束后,再进行下一个进程,一般我们都用非阻塞apply_async()

2.多个进程池

上面是使用单个进程池的,对于多个进程池,我们可以用for循环,直接看代码

# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time

def fun1(t):
 print 'this is fun1',time.ctime()
 time.sleep(t)
 print 'fun1 finish',time.ctime()

def fun2(t):
 print 'this is fun2',time.ctime()
 time.sleep(t)
 print 'fun2 finish',time.ctime()

if __name__ == '__main__':
 a=time.time()
 pool = Pool(processes =3) # 可以同时跑3个进程
 for fun in [fun1,fun2]:
  for i in range(3,8):
   pool.apply_async(fun,(i,))
 pool.close()
 pool.join()
 b=time.time()
 print 'finish',b-a

结果:

this is fun1 Mon Jun 05 16:04:38 2017
this is fun1 Mon Jun 05 16:04:38 2017
this is fun1 Mon Jun 05 16:04:38 2017
fun1 finish Mon Jun 05 16:04:41 2017
this is fun1 Mon Jun 05 16:04:41 2017
fun1 finish Mon Jun 05 16:04:42 2017
this is fun1 Mon Jun 05 16:04:42 2017
fun1 finish Mon Jun 05 16:04:43 2017
this is fun2 Mon Jun 05 16:04:43 2017
fun2 finish Mon Jun 05 16:04:46 2017
this is fun2 Mon Jun 05 16:04:46 2017
fun1 finish Mon Jun 05 16:04:47 2017
this is fun2 Mon Jun 05 16:04:47 2017
fun1 finish Mon Jun 05 16:04:49 2017
this is fun2 Mon Jun 05 16:04:49 2017
fun2 finish Mon Jun 05 16:04:50 2017
this is fun2 Mon Jun 05 16:04:50 2017
fun2 finish Mon Jun 05 16:04:52 2017
fun2 finish Mon Jun 05 16:04:55 2017
fun2 finish Mon Jun 05 16:04:57 2017
finish 19.1670000553

Process finished with exit code 0

看到了,在fun1运行完接着运行fun2.

另外对于没有参数的情况,就直接 pool.apply_async(funtion),无需写上参数.

在学习编写程序过程,曾遇到不用if _name_ == '_main_':而直接运行程序,这样结果会出错,经查询,在Windows上要想使用进程模块,就必须把有关进程的代码写在当前.py文件的if _name_ == ‘_main_' :语句的下面,才能正常使用Windows下的进程模块。Unix/Linux下则不需要。原因有人这么说:在执行的時候,由于你写的 py 会被当成module 读进执行。所以,一定要判断自身是否为 _main_。也就是要:

if __name__ == ‘__main__' :
# do something.

这里我自己还搞不清楚,期待以后能够理解

学习的过程中,还涉及了经常和进程一起运用的队列Queue和线程threading,有时间以后再写吧,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。

 类似资料:
  • 本文向大家介绍python并发编程之多进程、多线程、异步和协程详解,包括了python并发编程之多进程、多线程、异步和协程详解的使用技巧和注意事项,需要的朋友参考一下 最近学习python并发,于是对多进程、多线程、异步和协程做了个总结。 一、多线程 多线程就是允许一个进程内存在多个控制权,以便让多个函数同时处于激活状态,从而让多个函数的操作同时运行。即使是单CPU的计算机,也可以通过不停地在不同

  • 本文向大家介绍Python多进程fork()函数详解,包括了Python多进程fork()函数详解的使用技巧和注意事项,需要的朋友参考一下 进程 进程是程序的一次动态执行过程,它对应了从代码加载、执行到执行完毕的一个完整过程。进程是系统进行资源分配和调度的一个独立单位。进程是由代码(堆栈段)、数据(数据段)、内核状态和一组寄存器组成。 在多任务操作系统中,通过运行多个进程来并发地执行多个任务。由于

  • 本文向大家介绍nodejs基础之多进程实例详解,包括了nodejs基础之多进程实例详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了nodejs基础之多进程。分享给大家供大家参考,具体如下: Node.js 多进程 我们都知道 Node.js 是以单线程的模式运行的,但它使用的是事件驱动来处理并发,这样有助于我们在多核 cpu 的系统上创建多个子进程,从而提高性能。 每个子进程总是带有三

  • 本文向大家介绍Python中进程和线程的区别详解,包括了Python中进程和线程的区别详解的使用技巧和注意事项,需要的朋友参考一下 Num01–>线程 线程是操作系统中能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。 一个线程指的是进程中一个单一顺序的控制流。 一个进程中可以并发多条线程,每条线程并行执行不同的任务。 Num02–>进程 进程就是一个程序在一个数据集上的一次

  • 在worker中又保存了一份swProcessPool的指针,这样可以将两个不同的进程池合并在一起去wait。

  • 本文向大家介绍PHP多进程编程实例详解,包括了PHP多进程编程实例详解的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了PHP多进程编程。分享给大家供大家参考,具体如下: 第一步: $ php -m  命令查看php是否安装pcntl 和 posix扩展,若没有则安装 使用场景: 1. 要进行大量的网络耗时的操作 2. 要做大量的运算,并且,系统有多个cpu,为了让用户有更快的体验,把一个任