Stackless Python并发式编程介绍

胡永逸
2023-12-01

转载自   http://blog.csdn.net/changbaohua/article/details/3777410

截取部分自己需要看的内容

1 介绍

1.1 为什么要使用Stackless

摘自 stackless 网站。

Note

Stackless Python 是Python编程语言的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。Stackless为 Python带来的微线程扩展,是一种低开销、轻量级的便利工具,如果使用得当,可以获益如下:

  • 改进程序结构
  • 增进代码可读性
  • 提高编程人员生产力

以上是Stackless Python很简明的释义,但其对我们意义何在?——就在于Stackless提供的并发建模工具,比目前其它大多数传统编程语言所提供的,都更加易用: 不仅是Python自身,也包括Java、C++,以及其它。尽管还有其他一些语言提供并发特性,可它们要么是主要用于学术研究的(如 Mozart/Oz),要么是罕为使用、或用于特殊目的的专业语言(如Erlang)。而使用stackless,你将会在Python本身的所有优势之 上,在一个(但愿)你已经很熟悉的环境中,再获得并发的特性。

这自然引出了个问题:为什么要并发?

1.1.1 现实世界就是并发的

现实世界就是“并发”的,它是由一群事物(或“演员”)所组成,而这些事物以一种对彼此所知有限的、松散耦合的方式相互作用。传说中面向对象编程有 一个好处,就是对象能够对现实的世界进行模拟。这在一定程度上是正确的,面向对象编程很好地模拟了对象个体,但对于这些对象个体之间的交互,却无法以一种 理想的方式来表现。例如,如下代码实例,有什么问题?

def familyTacoNight():
    husband.eat(dinner)
    wife.eat(dinner)
    son.eat(dinner)
    daughter.eat(dinner)

第一印象,没问题。但是,上例中存在一个微妙的安排:所有事件是次序发生的,即:直到丈夫吃完饭,妻子才开始吃;儿子则一直等到母亲吃完才吃;而女 儿则是最后一个。在现实世界中,哪怕是丈夫还堵车在路上,妻子、儿子和女儿仍然可以该吃就吃,而要在上例中的话,他们只能饿死了——甚至更糟:永远没有人 会知道这件事,因为他们永远不会有机会抛出一个异常来通知这个世界!

1.1.2 并发可能是(仅仅可能是)下一个重要的编程范式

我个人相信,并发将是软件世界里的下一个重要范式。随着程序变得更加复杂和耗费资源,我们已经不能指望摩尔定律来每年给我们提供更快的CPU了,当 前,日常使用的个人计算机的性能提升来自于多核与多CPU机。一旦单个CPU的性能达到极限,软件开发者们将不得不转向分布式模型,靠多台计算机的互相协 作来建立强大的应用(想想GooglePlex)。为了取得多核机和分布式编程的优势,并发将很快成为做事情的方式的事实标准。

1.2 安装stackless

安装Stackless的细节可以在其网站上找到。现在Linux用户可以通过SubVersion取得源代码并编译;而对于Windows用户, 则有一个.zip文件供使用,需要将其解压到现有的Python安装目录中。接下来,本教程假设Stackless Python已经安装好了,可以工作,并且假设你对Python语言本身有基本的了解。

2 stackless起步

本章简要介绍了 stackless 的基本概念,后面章节将基于这些基础,来展示更加实用的功能。

2.1 微进程(tasklet)

微进程是stackless的基本构成单元,你可以通过提供任一个Python可调用对象(通常为函数或类的方法)来建立它,这将建立一个微进程并将其添加到调度器。这是一个快速演示:

Python 2.4.3 Stackless 3.1b3 060504 (#69, May  3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> def print_x(x):
...     print x
...
>>> stackless.tasklet(print_x)('one')
<stackless.tasklet object at 0x00A45870>
>>> stackless.tasklet(print_x)('two')
<stackless.tasklet object at 0x00A45A30>
>>> stackless.tasklet(print_x)('three')
<stackless.tasklet object at 0x00A45AB0>
>>>
>>> stackless.run()
one
two
three
>>>

注意,微进程将排起队来,并不运行,直到调用 stackless.run() 。

2.2 调度器(scheduler)

调度器控制各个微进程运行的顺序。如果刚刚建立了一组微进程,它们将按照建立的顺序来执行。在现实中,一般会建立一组可以再次被调度的微进程,好让每个都有轮次机会。一个快速演示:

Python 2.4.3 Stackless 3.1b3 060504 (#69, May  3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> def print_three_times(x):
...     print "1:", x
...     stackless.schedule()
...     print "2:", x
...     stackless.schedule()
...     print "3:", x
...     stackless.schedule()
...
>>>
>>> stackless.tasklet(print_three_times)('first')
<stackless.tasklet object at 0x00A45870>
>>> stackless.tasklet(print_three_times)('second')
<stackless.tasklet object at 0x00A45A30>
>>> stackless.tasklet(print_three_times)('third')
<stackless.tasklet object at 0x00A45AB0>
>>>
>>> stackless.run()
1: first
1: second
1: third
2: first
2: second
2: third
3: first
3: second
3: third
>>>

注意:当调用 stackless.schedule() 的时候,当前活动微进程将暂停执行,并将自身重新插入到调度器队列的末尾,好让下一个微进程被执行。一旦在它前面的所有其他微进程都运行过了,它将从上次 停止的地方继续开始运行。这个过程会持续,直到所有的活动微进程都完成了运行过程。这就是使用stackless达到合作式多任务的方式。

2.3 通道(channel)

通道使得微进程之间的信息传递成为可能。它做到了两件事:

  1. 能够在微进程之间交换信息。
  2. 能够控制运行的流程。

又一个快速演示:

C:>c:python24python
Python 2.4.3 Stackless 3.1b3 060504 (#69, May  3 2006, 19:20:41) [MSC v.1310 32
bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import stackless
>>>
>>> channel = stackless.channel()
>>>
>>> def receiving_tasklet():
...     print "Recieving tasklet started"
...     print channel.receive()
...     print "Receiving tasklet finished"
...
>>> def sending_tasklet():
...     print "Sending tasklet started"
...     channel.send("send from sending_tasklet")
...     print "sending tasklet finished"
...
>>> def another_tasklet():
...     print "Just another tasklet in the scheduler"
...
>>> stackless.tasklet(receiving_tasklet)()
<stackless.tasklet object at 0x00A45B30>
>>> stackless.tasklet(sending_tasklet)()
<stackless.tasklet object at 0x00A45B70>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Recieving tasklet started
Sending tasklet started
send from sending_tasklet
Receiving tasklet finished
Just another tasklet in the scheduler
sending tasklet finished
>>>

接收的微进程调用 channel.receive() 的时候,便阻塞住,这意味着该微进程暂停执行,直到有信息从这个通道送过来。除了往这个通道发送信息以外,没有其他任何方式可以让这个微进程恢复运行。

若有其他微进程向这个通道发送了信息,则不管当前的调度到了哪里,这个接收的微进程都立即恢复执行;而发送信息的微进程则被转移到调度列表的末尾,就像调用了 stackless.schedule() 一样。

同样注意,发送信息的时候,若当时没有微进程正在这个通道上接收,也会使当前微进程阻塞:

>>> stackless.tasklet(sending_tasklet)()
<stackless.tasklet object at 0x00A45B70>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Sending tasklet started
Just another tasklet in the scheduler
>>>
>>> stackless.tasklet(another_tasklet)()
<stackless.tasklet object at 0x00A45B30>
>>> stackless.run()
Just another tasklet in the scheduler
>>>
>>> #Finally adding the receiving tasklet
...
>>> stackless.tasklet(receiving_tasklet)()
<stackless.tasklet object at 0x00A45BF0>
>>>
>>> stackless.run()
Recieving tasklet started
send from sending_tasklet
Receiving tasklet finished
sending tasklet finished

发送信息的微进程,只有在成功地将数据发送到了另一个微进程之后,才会重新被插入到调度器中。

2.4 总结

以上涵盖了stackless的大部分功能。似乎不多是吧?——我们只使用了少许对象,和大约四五个函数调用,来进行操作。但是,使用这种简单的API作为基本建造单元,我们可以开始做一些真正有趣的事情。

3 协程(coroutine)

3.1 子例程的问题

大多数传统编程语言具有子例程的概念。一个子例程被另一个例程(可能还是其它某个例程的子例程)所调用,或返回一个结果,或不返回结果。从定义上说,一个子例程是从属于其调用者的。

见下例:

def ping():
    print "PING"
    pong()

def pong():
    print "PONG"
    ping()

ping()

有经验的编程者会看到这个程序的问题所在:它导致了堆栈溢出。如果运行这个程序,它将显示一大堆讨厌的跟踪信息,来指出堆栈空间已经耗尽。

3.1.1 堆栈

我仔细考虑了,自己对C语言堆栈的细节究竟了解多少,最终还是决定完全不去讲它。似乎,其他人对其所尝试的描述,以及图表,只有本身已经理解了的人才能看得懂。我将试着给出一个最简单的说明,而对其有更多兴趣的读者可以从网上查找更多信息。

每当一个子例程被调用,都有一个“栈帧”被建立,这是用来保存变量,以及其他子例程局部信息的区域。于是,当你调用 ping() ,则有一个栈帧被建立,来保存这次调用相关的信息。简言之,这个帧记载着 ping 被调用了。当再调用 pong() ,则又建立了一个栈帧,记载着 pong 也被调用了。这些栈帧是串联在一起的,每个子例程调用都是其中的一环。就这样,堆栈中显示: ping 被调用所以 pong 接下来被调用。显然,当 pong() 再调用 ping() ,则使堆栈再扩展。下面是个直观的表示:

堆栈
1ping 被调用
2ping 被调用,所以 pong 被调用
3ping 被调用,所以 pong 被调用,所以 ping 被调用
4ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用
5ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用
6ping 被调用,所以 pong 被调用,所以 ping 被调用,所以 pong 被调用,所以 ping 被调用……

现在假设,这个页面的宽度就表示系统为堆栈所分配的全部内存空间,当其顶到页面的边缘的时候,将会发生溢出,系统内存耗尽,即术语“堆栈溢出”。

3.1.2 那么,为什么要使用堆栈?

上例是有意设计的,用来体现堆栈的问题所在。在大多数情况下,当每个子例程返回的时候,其栈帧将被清除掉,就是说堆栈将会自行实现清理过程。这一般 来说是件好事,在C语言中,堆栈就是一个不需要编程者来手动进行内存管理的区域。很幸运,Python程序员也不需要直接来担心内存管理与堆栈。但是由于 Python解释器本身也是用C实现的,那些实现者们可是需要担心这个的。使用堆栈是会使事情方便,除非我们开始调用那种从不返回的函数,如上例中的,那 时候,堆栈的表现就开始和程序员别扭起来,并耗尽可用的内存。

3.2 走进协程

此时,将堆栈弄溢出是有点愚蠢的。 ping() 和 pong() 本不是真正意义的子例程,因为其中哪个也不从属于另一个,它们是“协程”,处于同等的地位,并可以彼此间进行无缝通信。

堆栈
1ping 被调用
2pong 被调用
3ping 被调用
4pong 被调用
5ping 被调用
6pong 被调用

在stackless中,我们使用通道来建立协程。还记得吗,通道所带来的两个好处中的一个,就是能够控制微进程之间运行的流程。使用通道,我们可以在 ping 和 pong 这两个协程之间自由来回,要多少次就多少次,都不会堆栈溢出:

#
# pingpong_stackless.py
#

import stackless

ping_channel = stackless.channel()
pong_channel = stackless.channel()

def ping():
    while ping_channel.receive(): #在此阻塞
        print "PING"
        pong_channel.send("from ping")

def pong():
    while pong_channel.receive():
        print "PONG"
        ping_channel.send("from pong")

stackless.tasklet(ping)()
stackless.tasklet(pong)()

# 我们需要发送一个消息来初始化这个游戏的状态
# 否则,两个微进程都会阻塞
stackless.tasklet(ping_channel.send)('startup')

stackless.run()

你可以运行这个程序要多久有多久,它都不会崩溃,且如果你检查其内存使用量(使用Windows的任务管理器或Linux的top命令),将会发现 使用量是恒定的。这个程序的协程版本,不管运行一分钟还是一天,使用的内存都是一样的。而如果你检查原先那个递归版本的内存用量,则会发现其迅速增长,直 到崩溃。

3.3 总结

是否还记得,先前我提到过,那个代码的递归版本,有经验的程序员会一眼看出毛病。但老实说,这里面并没有什么“计算机科学”方面的原因在阻碍它的正 常工作,有些让人坚信的东西,其实只是个与实现细节有关的小问题——只因为大多数传统编程语言都使用堆栈。某种意义上说,有经验的程序员都是被洗了脑,从 而相信这是个可以接受的问题。而stackless,则真正察觉了这个问题,并除掉了它。

4 轻量级线程

与当今的操作系统中内建的、和标准Python代码中所支持的普通线程相比,“微线程”要更为轻量级,正如其名称所暗示。它比传统线程占用更少的内存,并且微线程之间的切换,要比传统线程之间的切换更加节省资源。

为了准确说明微线程的效率究竟比传统线程高多少,我们用两者来写同一个程序。

4.1 hackysack模拟

Hackysack是一种游戏,就是一伙脏乎乎的小子围成一个圈,来回踢一个装满了豆粒的沙包,目标是不让这个沙包落地,当传球给别人的时候,可以耍各种把戏。踢沙包只可以用脚。

在我们的简易模拟中,我们假设一旦游戏开始,圈里人数就是恒定的,并且每个人都是如此厉害,以至于如果允许的话,这个游戏可以永远停不下来。

4.2 游戏的传统线程版本

import thread
import random
import sys
import Queue

class hackysacker:
    counter = 0
    def __init__(self,name,circle):
        self.name = name
        self.circle = circle
        circle.append(self)
        self.messageQueue = Queue.Queue()

        thread.start_new_thread(self.messageLoop,())

    def incrementCounter(self):
        hackysacker.counter += 1
        if hackysacker.counter >= turns:
            while self.circle:
                hs = self.circle.pop()
                if hs is not self:
                    hs.messageQueue.put('exit')
            sys.exit()

    def messageLoop(self):
        while 1:
            message = self.messageQueue.get()
            if message == "exit":
                debugPrint("%s is going home" % self.name)
                sys.exit()
            debugPrint("%s got hackeysack from %s" % (self.name, message.name))
            kickTo = self.circle[random.randint(0,len(self.circle)-1)]
            debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name))
            self.incrementCounter()
            kickTo.messageQueue.put(self)

def debugPrint(x):
    if debug:
        print x

debug=1
hackysackers=5
turns = 5

def runit(hs=10,ts=10,dbg=1):
    global hackysackers,turns,debug
    hackysackers = hs
    turns = ts
    debug = dbg

    hackysacker.counter= 0
    circle = []
    one = hackysacker('1',circle)

    for i in range(hackysackers):
        hackysacker(`i`,circle)

    one.messageQueue.put(one)

    try:
        while circle:
            pass
    except:
        #有时我们在清理过程中会遇到诡异的错误。
        pass

if __name__ == "__main__":
    runit(dbg=1)

一个“玩者”类的初始化用到了其名字,和一个指向包含了所有玩者的全局列表 circle 的引用,还有一个继承自Python标准库中的Queue类的消息队列。

Queue这个类的作用,与stackless的通道类似。它包含 put() 和 get() 方法,在一个空的Queue上调用 put() 会阻塞,直到另一个线程调用 put() 将数据送入Queue中为止。Queue这个类被设计为能与操作系统级的线程高效合作。

__init__ 方法接下来使用Python标准库中的thread模块新建一个线程,并在新线程中开始了一个消息循环。此消息循环是个无限循环,不停地处理队列中的消息。如果其收到一个特殊的消息 ‘exit’ ,则结束这个线程。

如果收到了另一个消息——指定其收到了沙包,玩者则从圈中随机选取一个其他玩者,通过向其发送一条消息来指定,将沙包再踢给它。

由类成员变量 hackysacker.counter 进行计数,当沙包被踢够了指定的次数时,将会向圈中的所有玩者都发送一条特殊的 ‘exit’ 消息。

注意,当全局变量debug为非零的时候,还有个函数debugPrint可以输出信息。我们可以使这游戏输出到标准输出,但当计时的时候,这会影响精确度。

我们来运行这个程序,并检查其是否正常工作:

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe
hackysackthreaded.py

1 got hackeysack from 1
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 0
0 got hackeysack from 4
0 kicking hackeysack to 1
1 got hackeysack from 0
1 kicking hackeysack to 3
3 got hackeysack from 1
3 kicking hackeysack to 3
4 is going home
2 is going home
1 is going home
0 is going home
1 is going home

C:Documents and SettingsgrantDesktopwhy_stacklesscode>

如我们所见,所有玩者到了一起,并很快地进行了一场游戏。现在,我们对若干次实验运行过程进行计时。Python标准库中有一个 timeit.py 程序,可以用作此目的。那么,我们也同时关掉调试输出:

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackysackthreaded" hackysackthreaded.runit(10,1000,0)
10 loops, best of 3: 183 msec per loop

在我的机器上,十个玩者共进行1000次传球,共使用了183毫秒。我们来增加玩者的数量:

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackeysackthreaded" hackeysackthreaded.runit(100,1000,0)
10 loops, best of 3: 231 msec per loop

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackysackthreaded" hackysackthreaded.runit(1000,1000,0)
10 loops, best of 3: 681 msec per loop

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:python24python.exe c:Python24libtimeit.py -s "import hackysackthreaded" hackysackthreaded.runit(10000,1000,0)
Traceback (most recent call last):
  File "c:Python24libtimeit.py", line 255, in main
    x = t.timeit(number)
  File "c:Python24libtimeit.py", line 161, in timeit
    timing = self.inner(it, self.timer)
  File "<timeit-src>", line 6, in inner
  File ".hackeysackthreaded.py", line 58, in runit
    hackysacker(`i`,circle)
  File ".hackeysackthreaded.py", line 14, in __init__
    thread.start_new_thread(self.messageLoop,())
error: can't start new thread

在我的3GHz、1G内存的机器上,当尝试10,000个线程的时候出现了错误。就不想拿出这详细的输出内容来扰人了,只是通过若干实验与出错过程 得出,在我机器上,此程序从1100个线程左右开始出错。另请注意,1000个线程时候所耗用的时间,是10个线程时候的大约三倍。

4.3 stackless

import stackless
import random
import sys

class hackysacker:
    counter = 0
    def __init__(self,name,circle):
        self.name = name
        self.circle = circle
        circle.append(self)
        self.channel = stackless.channel()

        stackless.tasklet(self.messageLoop)()

    def incrementCounter(self):
        hackysacker.counter += 1
        if hackysacker.counter >= turns:
            while self.circle:
                self.circle.pop().channel.send('exit')

    def messageLoop(self):
        while 1:
            message = self.channel.receive()
            if message == 'exit':
                return
            debugPrint("%s got hackeysack from %s" % (self.name, message.name))
            kickTo = self.circle[random.randint(0,len(self.circle)-1)]
            while kickTo is self:
                kickTo = self.circle[random.randint(0,len(self.circle)-1)]
            debugPrint("%s kicking hackeysack to %s" % (self.name, kickTo.name))
            self.incrementCounter()
            kickTo.channel.send(self)

def debugPrint(x):
    if debug:print x

debug = 5
hackysackers = 5
turns = 1

def runit(hs=5,ts=5,dbg=1):
    global hackysackers,turns,debug
    hackysackers = hs
    turns = ts
    debug = dbg

    hackysacker.counter = 0
    circle = []
    one = hackysacker('1',circle)

    for i in range(hackysackers):
        hackysacker(`i`,circle)

    one.channel.send(one)

    try:
        stackless.run()
    except TaskletExit:
        pass

if __name__ == "__main__":
    runit()

以上代码实质上与线程版本是等价的,主要区别仅在于我们使用微进程来代替线程,并且使用通道代替Queue来进行切换。让我们运行它,并检查输出:

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe hackysackstackless.py
1 got hackeysack from 1
1 kicking hackeysack to 1
1 got hackeysack from 1
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 1
1 got hackeysack from 4
1 kicking hackeysack to 4
4 got hackeysack from 1
4 kicking hackeysack to 0

工作情况确如所料。现在来计时:

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(10,1000,0)
100 loops, best of 3: 19.7 msec per loop

其仅用了19.7毫秒,速度几乎是线程版本的10倍。现在我们同样开始增加微线程的数量:

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(100,1000,0)
100 loops, best of 3: 19.7 msec per loop

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(1000,1000,0)
10 loops, best of 3: 26.9 msec per loop

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(10000,1000,0)
10 loops, best of 3: 109 msec per loop

C:Documents and SettingsgrantDesktopwhy_stacklesscode>c:Python24python.exe c:Python24libtimeit.py -s"import hackysackstackless" hackysackstackless.runit(100000,1000,0)
10 loops, best of 3: 1.07 sec per loop

甚至直到10,000个线程的时候,那时线程版本早已不能运行了,而这个仍然可以比线程版本在10个线程的时候运行的还快。

这里我在尽量保持代码的简洁,因此你可以相信我的话:计时时间的增长仅仅在于初始化游戏圈子的部分,而真正进行游戏的时间则是一直不变的,不管使用 10个微线程,还是10,000个。这归因于通道的工作方式:当它们收到消息的时候,是立即进行阻塞和恢复操作的。另一方面,各个操作系统线程则是轮番检 查自己的队列里是否有了东西,这意味着,跑着越多的线程,性能就变得越差。

4.4 总结

但愿我已经成功地演示了,微线程的运行至少比操作系统线程快一个数量级,并具备远高于后者的可伸缩性。关于操作系统线程的一般常识是:(1)尽量不要使用它,(2)如果非用不可,就能少用一点就少用一点。而stackless的微线程则使我们从这些限制中解放出来。


 类似资料: