转载自 http://blog.csdn.net/changbaohua/article/details/3777410
截取部分自己需要看的内容
摘自 stackless 网站。
Note
Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具,如果使用得当,可以获益如下:
- 改进程序结构
- 增进代码可读性
- 提高编程人员生产力
以上是Stackless Python很简明的释义,但其对我们意义何在?——就在于Stackless提供的并发建模工具,比目前其它大多数传统编程语言所提供的,都更加易用: 不仅是Python自身,也包括Java、C++,以及其它。尽管还有其他一些语言提供并发特性,可它们要么是主要用于学术研究的(如 Mozart/Oz),要么是罕为使用、或用于特殊目的的专业语言(如Erlang)。而使用stackless,你将会在Python本身的所有优势之 上,在一个(但愿)你已经很熟悉的环境中,再获得并发的特性。
这自然引出了个问题:为什么要并发?
现实世界就是“并发”的,它是由一群事物(或“演员”)所组成,而这些事物以一种对彼此所知有限的、松散耦合的方式相互作用。传说中面向对象编程有 一个好处,就是对象能够对现实的世界进行模拟。这在一定程度上是正确的,面向对象编程很好地模拟了对象个体,但对于这些对象个体之间的交互,却无法以一种 理想的方式来表现。例如,如下代码实例,有什么问题?
def familyTacoNight(): husband.eat(dinner) wife.eat(dinner) son.eat(dinner) daughter.eat(dinner)
第一印象,没问题。但是,上例中存在一个微妙的安排:所有事件是次序发生的,即:直到丈夫吃完饭,妻子才开始吃;儿子则一直等到母亲吃完才吃;而女 儿则是最后一个。在现实世界中,哪怕是丈夫还堵车在路上,妻子、儿子和女儿仍然可以该吃就吃,而要在上例中的话,他们只能饿死了——甚至更糟:永远没有人 会知道这件事,因为他们永远不会有机会抛出一个异常来通知这个世界!
我个人相信,并发将是软件世界里的下一个重要范式。随着程序变得更加复杂和耗费资源,我们已经不能指望摩尔定律来每年给我们提供更快的CPU了,当 前,日常使用的个人计算机的性能提升来自于多核与多CPU机。一旦单个CPU的性能达到极限,软件开发者们将不得不转向分布式模型,靠多台计算机的互相协 作来建立强大的应用(想想GooglePlex)。为了取得多核机和分布式编程的优势,并发将很快成为做事情的方式的事实标准。
安装Stackless的细节可以在其网站上找到。现在Linux用户可以通过SubVersion取得源代码并编译;而对于Windows用户, 则有一个.zip文件供使用,需要将其解压到现有的Python安装目录中。接下来,本教程假设Stackless Python已经安装好了,可以工作,并且假设你对Python语言本身有基本的了解。
本章简要介绍了 stackless 的基本概念,后面章节将基于这些基础,来展示更加实用的功能。
微进程是stackless的基本构成单元,你可以通过提供任一个Python可调用对象(通常为函数或类的方法)来建立它,这将建立一个微进程并将其添加到调度器。这是一个快速演示:
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> import stackless >>> >>> def print_x(x): ... print x ... >>> stackless.tasklet(print_x)('one') <stackless.tasklet object at 0x00A45870> >>> stackless.tasklet(print_x)('two') <stackless.tasklet object at 0x00A45A30> >>> stackless.tasklet(print_x)('three') <stackless.tasklet object at 0x00A45AB0> >>> >>> stackless.run() one two three >>>
注意,微进程将排起队来,并不运行,直到调用 stackless.run() 。
调度器控制各个微进程运行的顺序。如果刚刚建立了一组微进程,它们将按照建立的顺序来执行。在现实中,一般会建立一组可以再次被调度的微进程,好让每个都有轮次机会。一个快速演示:
Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> import stackless >>> >>> def print_three_times(x): ... print "1:", x ... stackless.schedule() ... print "2:", x ... stackless.schedule() ... print "3:", x ... stackless.schedule() ... >>> >>> stackless.tasklet(print_three_times)('first') <stackless.tasklet object at 0x00A45870> >>> stackless.tasklet(print_three_times)('second') <stackless.tasklet object at 0x00A45A30> >>> stackless.tasklet(print_three_times)('third') <stackless.tasklet object at 0x00A45AB0> >>> >>> stackless.run() 1: first 1: second 1: third 2: first 2: second 2: third 3: first 3: second 3: third >>>
注意:当调用 stackless.schedule() 的时候,当前活动微进程将暂停执行,并将自身重新插入到调度器队列的末尾,好让下一个微进程被执行。一旦在它前面的所有其他微进程都运行过了,它将从上次 停止的地方继续开始运行。这个过程会持续,直到所有的活动微进程都完成了运行过程。这就是使用stackless达到合作式多任务的方式。
通道使得微进程之间的信息传递成为可能。它做到了两件事:
- 能够在微进程之间交换信息。
- 能够控制运行的流程。
又一个快速演示:
C:>c:python24python Python 2.4.3 Stackless 3.1b3 060504 (#69, May 3 2006, 19:20:41) [MSC v.1310 32 bit (Intel)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> import stackless >>> >>> channel = stackless.channel() >>> >>> def receiving_tasklet(): ... print "Recieving tasklet started" ... print channel.receive() ... print "Receiving tasklet finished" ... >>> def sending_tasklet(): ... print "Sending tasklet started" ... channel.send("send from sending_tasklet") ... print "sending tasklet finished" ... >>> def another_tasklet(): ... print "Just another tasklet in the scheduler" ... >>> stackless.tasklet(receiving_tasklet)() <stackless.tasklet object at 0x00A45B30> >>> stackless.tasklet(sending_tasklet)() <stackless.tasklet object at 0x00A45B70> >>> stackless.tasklet(another_tasklet)() <stackless.tasklet object at 0x00A45BF0> >>> >>> stackless.run() Recieving tasklet started Sending tasklet started send from sending_tasklet Receiving tasklet finished Just another tasklet in the scheduler sending tasklet finished >>>
接收的微进程调用 channel.receive() 的时候,便阻塞住,这意味着该微进程暂停执行,直到有信息从这个通道送过来。除了往这个通道发送信息以外,没有其他任何方式可以让这个微进程恢复运行。
若有其他微进程向这个通道发送了信息,则不管当前的调度到了哪里,这个接收的微进程都立即恢复执行;而发送信息的微进程则被转移到调度列表的末尾,就像调用了 stackless.schedule() 一样。
同样注意,发送信息的时候,若当时没有微进程正在这个通道上接收,也会使当前微进程阻塞:
>>> stackless.tasklet(sending_tasklet)() <stackless.tasklet object at 0x00A45B70> >>> stackless.tasklet(another_tasklet)() <stackless.tasklet object at 0x00A45BF0> >>> >>> stackless.run() Sending tasklet started Just another tasklet in the scheduler >>> >>> stackless.tasklet(another_tasklet)() <stackless.tasklet object at 0x00A45B30> >>> stackless.run() Just another tasklet in the scheduler >>> >>> #Finally adding the receiving tasklet ... >>> stackless.tasklet(receiving_tasklet)() <stackless.tasklet object at 0x00A45BF0> >>> >>> stackless.run() Recieving tasklet started send from sending_tasklet Receiving tasklet finished sending tasklet finished
发送信息的微进程,只有在成功地将数据发送到了另一个微进程之后,才会重新被插入到调度器中。
以上涵盖了stackless的大部分功能。似乎不多是吧?——我们只使用了少许对象,和大约四五个函数调用,来进行操作。但是,使用这种简单的API作为基本建造单元,我们可以开始做一些真正有趣的事情。
大多数传统编程语言具有子例程的概念。一个子例程被另一个例程(可能还是其它某个例程的子例程)所调用,或返回一个结果,或不返回结果。从定义上说,一个子例程是从属于其调用者的。
见下例:
def ping(): print "PING" pong() def pong(): print "PONG" ping() ping()
有经验的编程者会看到这个程序的问题所在:它导致了堆栈溢出。如果运行这个程序,它将显示一大堆讨厌的跟踪信息,来指出堆栈空间已经耗尽。
我仔细考虑了,自己对C语言堆栈的细节究竟了解多少,最终还是决定完全不去讲它。似乎,其他人对其所尝试的描述,以及图表,只有本身已经理解了的人才能看得懂。我将试着给出一个最简单的说明,而对其有更多兴趣的读者可以从网上查找更多信息。
每当一个子例程被调用,都有一个“栈帧”被建立,这是用来保存变量,以及其他子例程局部信息的区域。于是,当你调用 ping() ,则有一个栈帧被建立,来保存这次调用相关的信息。简言之,这个帧记载着 ping 被调用了。当再调用 pong() ,则又建立了一个栈帧,记载着 pong 也被调用了。这些栈帧是串联在一起的,每个子例程调用都是其中的一环。就这样,堆栈中显示: ping 被调用所以 pong 接下来被调用。显然,当 pong() 再调用 ping() ,则使堆栈再扩展。下面是个直观的表示:
帧 | 堆栈 |
1 | ping 被调用 |
2 | ping 被调用,所以 pong 被调用 |
3 | ping 被调用,所以 pong 被调用,所以 ping 被调用 |
4 | ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用 |
5 | ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用 |
6 | ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用…… |
现在假设,这个页面的宽度就表示系统为堆栈所分配的全部内存空间,当其顶到页面的边缘的时候,将会发生溢出,系统内存耗尽,即术语“堆栈溢出”。
上例是有意设计的,用来体现堆栈的问题所在。在大多数情况下,当每个子例程返回的时候,其栈帧将被清除掉,就是说堆栈将会自行实现清理过程。这一般 来说是件好事,在C语言中,堆栈就是一个不需要编程者来手动进行内存管理的区域。很幸运,Python程序员也不需要直接来担心内存管理与堆栈。但是由于 Python解释器本身也是用C实现的,那些实现者们可是需要担心这个的。使用堆栈是会使事情方便,除非我们开始调用那种从不返回的函数,如上例中的,那 时候,堆栈的表现就开始和程序员别扭起来,并耗尽可用的内存。
此时,将堆栈弄溢出是有点愚蠢的。 ping() 和 pong() 本不是真正意义的子例程,因为其中哪个也不从属于另一个,它们是“协程”,处于同等的地位,并可以彼此间进行无缝通信。
帧 | 堆栈 |
1 | ping 被调用 |
2 | pong 被调用 |
3 | ping 被调用 |
4 | pong 被调用 |
5 | ping 被调用 |
6 | pong 被调用 |
在stackless中,我们使用通道来建立协程。还记得吗,通道所带来的两个好处中的一个,就是能够控制微进程之间运行的流程。使用通道,我们可以在 ping 和 pong 这两个协程之间自由来回,要多少次就多少次,都不会堆栈溢出:
# # pingpong_stackless.py # import stackless ping_channel = stackless.channel() pong_channel = stackless.channel() def ping(): while ping_channel.receive(): #在此阻塞 print "PING" pong_channel.send("from ping") def pong(): while pong_channel.receive(): print "PONG" ping_channel.send("from pong") stackless.tasklet(ping)() stackless.tasklet(pong)() # 我们需要发送一个消息来初始化这个游戏的状态 # 否则,两个微进程都会阻塞 stackless.tasklet(ping_channel.send)('startup') stackless.run()
你可以运行这个程序要多久有多久,它都不会崩溃,且如果你检查其内存使用量(使用Windows的任务管理器或Linux的top命令),将会发现 使用量是恒定的。这个程序的协程版本,不管运行一分钟还是一天,使用的内存都是一样的。而如果你检查原先那个递归版本的内存用量,则会发现其迅速增长,直 到崩溃。
是否还记得,先前我提到过,那个代码的递归版本,有经验的程序员会一眼看出毛病。但老实说,这里面并没有什么“计算机科学”方面的原因在阻碍它的正 常工作,有些让人坚信的东西,其实只是个与实现细节有关的小问题——只因为大多数传统编程语言都使用堆栈。某种意义上说,有经验的程序员都是被洗了脑,从 而相信这是个可以接受的问题。而stackless,则真正察觉了这个问题,并除掉了它。
与当今的操作系统中内建的、和标准Python代码中所支持的普通线程相比,“微线程”要更为轻量级,正如其名称所暗示。它比传统线程占用更少的内存,并且微线程之间的切换,要比传统线程之间的切换更加节省资源。
为了准确说明微线程的效率究竟比传统线程高多少,我们用两者来写同一个程序。
Hackysack是一种游戏,就是一伙脏乎乎的小子围成一个圈,来回踢一个装满了豆粒的沙包,目标是不让这个沙包落地,当传球给别人的时候,可以耍各种把戏。踢沙包只可以用脚。
在我们的简易模拟中,我们假设一旦游戏开始,圈里人数就是恒定的,并且每个人都是如此厉害,以至于如果允许的话,这个游戏可以永远停不下来。
import thread import random import sys import Queue class hackysacker: counter = 0 def __init__(self,name,circle): self.name = name self.circle = circle circle.append(self) self.messageQueue = Queue.Queue() thread.start_new_thread(self.messageLoop,()) def incrementCounter(self): hackysacker.counter += 1 if hackysacker.counter >= turns: while self.circle: hs = self.circle.pop() if hs is not self: hs.messageQueue.put('exit') sys.exit() def messageLoop(self): while 1: message = self.messageQueue.get() if message == "exit": debugPrint("%s is going home" % self.name) sys.exit() debugPrint("%s got hackeysack from %s" % (self.name, message.name)) kickTo = self.circle[random.randint(0,len(self.circle)-1)] debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name)) self.incrementCounter() kickTo.messageQueue.put(self) def debugPrint(x): if debug: print x debug=1 hackysackers=5 turns = 5 def runit(hs=10,ts=10,dbg=1): global hackysackers,turns,debug hackysackers = hs turns = ts debug = dbg hackysacker.counter= 0 circle = [] one = hackysacker('1',circle) for i in range(hackysackers): hackysacker(`i`,circle) one.messageQueue.put(one) try: while circle: pass except: #有时我们在清理过程中会遇到诡异的错误。 pass if __name__ == "__main__": runit(dbg=1)
一个“玩者”类的初始化用到了其名字,和一个指向包含了所有玩者的全局列表 circle 的引用,还有一个继承自Python标准库中的Queue类的消息队列。
Queue这个类的作用,与stackless的通道类似。它包含 put() 和 get() 方法,在一个空的Queue上调用 put() 会阻塞,直到另一个线程调用 put() 将数据送入Queue中为止。Queue这个类被设计为能与操作系统级的线程高效合作。
__init__ 方法接下来使用Python标准库中的thread模块新建一个线程,并在新线程中开始了一个消息循环。此消息循环是个无限循环,不停地处理队列中的消息。如果其收到一个特殊的消息 ‘exit’ ,则结束这个线程。
如果收到了另一个消息——指定其收到了沙包,玩者则从圈中随机选取一个其他玩者,通过向其发送一条消息来指定,将沙包再踢给它。
由类成员变量 hackysacker.counter 进行计数,当沙包被踢够了指定的次数时,将会向圈中的所有玩者都发送一条特殊的 ‘exit’ 消息。
注意,当全局变量debug为非零的时候,还有个函数debugPrint可以输出信息。我们可以使这游戏输出到标准输出,但当计时的时候,这会影响精确度。
我们来运行这个程序,并检查其是否正常工作:
C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe hackysackthreaded.py 1 got hackeysack from 1 1 kicking hackeysack to 4 4 got hackeysack from 1 4 kicking hackeysack to 0 0 got hackeysack from 4 0 kicking hackeysack to 1 1 got hackeysack from 0 1 kicking hackeysack to 3 3 got hackeysack from 1 3 kicking hackeysack to 3 4 is going home 2 is going home 1 is going home 0 is going home 1 is going home C:Documents and SettingsgrantDesktopwhy_stacklesscode>
如我们所见,所有玩者到了一起,并很快地进行了一场游戏。现在,我们对若干次实验运行过程进行计时。Python标准库中有一个 timeit.py 程序,可以用作此目的。那么,我们也同时关掉调试输出:
C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackysackthreaded" hackysackthreaded.runit(10,1000,0) 10 loops, best of 3: 183 msec per loop
在我的机器上,十个玩者共进行1000次传球,共使用了183毫秒。我们来增加玩者的数量:
C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackeysackthreaded" hackeysackthreaded.runit(100,1000,0) 10 loops, best of 3: 231 msec per loop C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackysackthreaded" hackysackthreaded.runit(1000,1000,0) 10 loops, best of 3: 681 msec per loop C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackysackthreaded" hackysackthreaded.runit(10000,1000,0) Traceback (most recent call last): File "c:Python24libtimeit.py", line 255, in main x = t.timeit(number) File "c:Python24libtimeit.py", line 161, in timeit timing = self.inner(it, self.timer) File "<timeit-src>", line 6, in inner File ".hackeysackthreaded.py", line 58, in runit hackysacker(`i`,circle) File ".hackeysackthreaded.py", line 14, in __init__ thread.start_new_thread(self.messageLoop,()) error: can't start new thread
在我的3GHz、1G内存的机器上,当尝试10,000个线程的时候出现了错误。就不想拿出这详细的输出内容来扰人了,只是通过若干实验与出错过程 得出,在我机器上,此程序从1100个线程左右开始出错。另请注意,1000个线程时候所耗用的时间,是10个线程时候的大约三倍。
import stackless import random import sys class hackysacker: counter = 0 def __init__(self,name,circle): self.name = name self.circle = circle circle.append(self) self.channel = stackless.channel() stackless.tasklet(self.messageLoop)() def incrementCounter(self): hackysacker.counter += 1 if hackysacker.counter >= turns: while self.circle: self.circle.pop().channel.send('exit') def messageLoop(self): while 1: message = self.channel.receive() if message == 'exit': return debugPrint("%s got hackeysack from %s" % (self.name, message.name)) kickTo = self.circle[random.randint(0,len(self.circle)-1)] while kickTo is self: kickTo = self.circle[random.randint(0,len(self.circle)-1)] debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name)) self.incrementCounter() kickTo.channel.send(self) def debugPrint(x): if debug:print x debug = 5 hackysackers = 5 turns = 1 def runit(hs=5,ts=5,dbg=1): global hackysackers,turns,debug hackysackers = hs turns = ts debug = dbg hackysacker.counter = 0 circle = [] one = hackysacker('1',circle) for i in range(hackysackers): hackysacker(`i`,circle) one.channel.send(one) try: stackless.run() except TaskletExit: pass if __name__ == "__main__": runit()
以上代码实质上与线程版本是等价的,主要区别仅在于我们使用微进程来代替线程,并且使用通道代替Queue来进行切换。让我们运行它,并检查输出:
C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe hackysackstackless.py 1 got hackeysack from 1 1 kicking hackeysack to 1 1 got hackeysack from 1 1 kicking hackeysack to 4 4 got hackeysack from 1 4 kicking hackeysack to 1 1 got hackeysack from 4 1 kicking hackeysack to 4 4 got hackeysack from 1 4 kicking hackeysack to 0
工作情况确如所料。现在来计时:
C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(10,1000,0) 100 loops, best of 3: 19.7 msec per loop
其仅用了19.7毫秒,速度几乎是线程版本的10倍。现在我们同样开始增加微线程的数量:
C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(100,1000,0) 100 loops, best of 3: 19.7 msec per loop C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(1000,1000,0) 10 loops, best of 3: 26.9 msec per loop C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(10000,1000,0) 10 loops, best of 3: 109 msec per loop C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(100000,1000,0) 10 loops, best of 3: 1.07 sec per loop
甚至直到10,000个线程的时候,那时线程版本早已不能运行了,而这个仍然可以比线程版本在10个线程的时候运行的还快。
这里我在尽量保持代码的简洁,因此你可以相信我的话:计时时间的增长仅仅在于初始化游戏圈子的部分,而真正进行游戏的时间则是一直不变的,不管使用 10个微线程,还是10,000个。这归因于通道的工作方式:当它们收到消息的时候,是立即进行阻塞和恢复操作的。另一方面,各个操作系统线程则是轮番检 查自己的队列里是否有了东西,这意味着,跑着越多的线程,性能就变得越差。
但愿我已经成功地演示了,微线程的运行至少比操作系统线程快一个数量级,并具备远高于后者的可伸缩性。关于操作系统线程的一般常识是:(1)尽量不要使用它,(2)如果非用不可,就能少用一点就少用一点。而stackless的微线程则使我们从这些限制中解放出来。