执行效率
CPython 作为最流行的 Python 环境,对于 CPU 密集型任务较慢,而 PyPy 则较快。
使用稍作改动的 David Beazley 的 CPU 密集测试代码(增加了多重循环进行多轮测试), 可以看到 CPython 与 PyPy 之间的执行差距。
# PyPy
$ ./pypy -V
Python 2.7.1 (7773f8fc4223, Nov 18 2011, 18:47:10)
[PyPy 1.7.0 with GCC 4.4.3]
$ ./pypy measure2.py
0.0683999061584
0.0483210086823
0.0388588905334
0.0440690517426
0.0695300102234
# CPython
$ ./python -V
Python 2.7.1
$ ./python measure2.py
1.06774401665
1.45412397385
1.51485204697
1.54693889618
1.60109114647
运行环境
The GIL
GIL (Global interpreter lock 全局解释器锁) 是 Python 支持多线程并行操作的方式。 Python 的内存管理不是完全线程安全的,所以 GIL 被创造出来避免多线程同时运行同一个 Python 代码。
David Beazley 有一个关于 GIL 如何工作的 指南 。他也讨论了 Python3.2 中的 新 GIL 他的结论是为了最大化一个 Python 程序的性能,应该对 GIL 工作方式有一个深刻的理解 —— 它如何影响您的特定程序,您拥有多少核,以及您程序瓶颈在哪。
C 扩展
当写一个C扩展时必须 特别关注 在解释器中注册您的线程。
GIL 全局锁
当写一个 C 扩展时必须 重点注意 在解释器中注册你的线程。
C 扩展
Cython
Cython 是 Python 语言的一个超集,它允许你为 Python 写 C 或 C++ 模块。Cython 也使得你能够从已编译的 C 库中调用函数。使用 Cython 让您得以发挥 Python 的变量与操作的强类型优势。
下面是一个 Cython 中的强类型例子:
def primes(int kmax):
"""Calculation of prime numbers with additional
Cython keywords"""
cdef int n, k, i
cdef int p[1000]
result = []
if kmax > 1000:
kmax = 1000
k = 0
n = 2
while k < kmax:
i = 0
while i < k and n % p[i] != 0:
i = i + 1
if i == k:
p[k] = n
k = k + 1
result.append(n)
n = n + 1
return result
请尝试着将这个『附加关键字寻找素数算法』的 Cython 实现与下面这个纯 Python 实现比较:
def primes(kmax):
"""Calculation of prime numbers in standard Python syntax"""
p = range(1000)
result = []
if kmax > 1000:
kmax = 1000
k = 0
n = 2
while k < kmax:
i = 0
while i < k and n % p[i] != 0:
i = i + 1
if i == k:
p[k] = n
k = k + 1
result.append(n)
n = n + 1
return result
注意在 Cython 版本里,创建一个 Python 列表时,声明了会被编译为 C 类型的整型和整型数组:
def primes(int kmax):
"""Calculation of prime numbers with additional
Cython keywords"""
cdef int n, k, i
cdef int p[1000]
result = []
def primes(kmax):
"""Calculation of prime numbers in standard Python syntax"""
p = range(1000)
result = []
它们之间有什么差别呢?在上面的 Cython 版本中,您可以看到变量类型与整型数组像标准 C 一样被声明。 在上面的例子中,第三行的 cdef int n,k,i
这个附加类型声明(整型)使得 Cython 编译器得以产生比 第二个版本更有效率的 C 代码。标准 Python 代码以 .py 格式保存,而 Cython 以 .pyx 格式保存。
速度上有什么差异呢?看看这个:
import time
#activate pyx compiler
import pyximport
pyximport.install()
#primes implemented with Cython
import primesCy
#primes implemented with Python
import primes
print "Cython:"
t1= time.time()
print primesCy.primes(500)
t2= time.time()
print "Cython time: %s" %(t2-t1)
print ""
print "Python"
t1= time.time()
print primes.primes(500)
t2= time.time()
print "Python time: %s" %(t2-t1)
这两行代码需要一些说明:
import pyximport
pyximport.install()
pyximport 使得你可以导入 .pyx 文件,(像 primesCy.pyx
这样的)。 pyximport.install()
命令使 Python 解释器可以打开 Cython 编译器直接编译出 .so 格式的 C 库。Cython之后可以导入这个库到您的 Python 代码中,简便而有效。使用 time.time()
函数您可以比较两个不同的在查找 500 个素数的调用长的时间消耗差异。在一个标准笔记本中 (双核AMD E-450 1.6GHz),测量值是这样的:
Cython time: 0.0054 seconds
Python time: 0.0566 seconds
而这个是嵌入的 ARM beaglebone 机的输出结果:
Cython time: 0.0196 seconds
Python time: 0.3302 seconds
Pyrex
Shedskin?
并行运算
Concurrent.futures
concurrent.futures 模块是 Python 标准库中的一个模块,它提供了一个 『用于异步调用的高级接口』。 它抽象和简化了许多使用多个线程或进程并发的复杂细节,并允许你更专注于完成手头的任务。
concurrent.futures 模块提供了两个主要的类,即 ThreadPoolExecutor
和 ProcessPoolExecutor
。 ThreadPoolExecutor
将创建一个用户可以提交作业的工作线程池。当下一个工作线程可用时, 这些作业将在另一个线程中执行。
ProcessPoolExecutor
以相同的方式工作,它使用多进程而不是多线程作为工作池。这就可以避开 GIL 的问题,但是由于传递参数给工作进程的机制限制,只有可被 Pickle 序列化的对象才能够被执行并返回。
由于 GIL 的工作原理,一个较好的经验法则是当执行涉及很多阻塞(如通过网络发出请求)的任务时,使用 ThreadPoolExecutor
,而对高计算开销的任务使用 ProcessPoolExecutor
执行器。
使用两个执行器并行执行有两个主要方法。一个是使用 map(func, iterables)
方法。 这个函数除了能并行执行一切,它几乎和内置的 map()
函数一模一样 :
from concurrent.futures import ThreadPoolExecutor
import requests
def get_webpage(url):
page = requests.get(url)
return page
pool = ThreadPoolExecutor(max_workers=5)
my_urls = ['http://google.com/']*10 # 创建 url 列表 list
for page in pool.map(get_webpage, my_urls):
# 处理运行结果
print(page.text)
为了进一步的控制,submit(func, *args, **kwargs)
方法将调度一个可执行的调用 (如 func(*args, **kwargs)
),并返回一个代表可调用的 Future 执行对象。
Future 对象提供了可用于检查计划可调用进程的各种方法。这些包括:
cancel()
尝试取消调用。
cancelled()
如果调用被成功取消,返回 True。
running()
如果当前正在执行调用而且没被取消,则返回 True。
done()
如果调用被成功取消或完成运行,返回 True。
result()
返回调用返回的值。请注意,此调用将阻塞以至可调用对象有结果返回。
exception()
返回调用抛出的异常。如果没有抛出异常,将返回 None
。请注意,这和 result()
一样会阻塞。
add_done_callback(fn)
添加回调函数,并在所调用的可调用对象返回结果时执行(如 fn(future) )。
from concurrent.futures import ProcessPoolExecutor, as_completed
def is_prime(n):
if n % 2 == 0:
return n, False
sqrt_n = int(n**0.5)
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return n, False
return n, True
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
futures = []
with ProcessPoolExecutor(max_workers=4) as pool:
# Schedule the ProcessPoolExecutor to check if a number is prime
# and add the returned Future to our list of futures
for p in PRIMES:
fut = pool.submit(is_prime, p)
futures.append(fut)
# As the jobs are completed, print out the results
for number, result in as_completed(futures):
if result:
print("{} is prime".format(number))
else:
print("{} is not prime".format(number))
concurrent.futures 模块包含两个帮助函数来处理 Futures。as_completed(futures)
函数 返回 futures
列表的的迭代器,在 futures
结束时 yield
。
而 wait(futures)
函数则简单地阻塞,直到列表中所有的 futures
完成。
有关使用 concurrent.futures 模块的更多信息,请参阅官方文档。
Threading
标准库带有一个 threading 模块,允许用户手动处理多个线程。
在另一个线程中运行一个函数是一件非常简单的事情,传参一个可执行对象到 Thread
的构造函数中, 然后调用 start()
即可:
from threading import Thread
import requests
def get_webpage(url):
page = requests.get(url)
return page
some_thread = Thread(get_webpage, 'http://google.com/')
some_thread.start()
调用 join()
来等待线程终止:
some_thread.join()
调用 join()
后,建议要检查下线程是否仍然存活(因为 join
调用超时):
if some_thread.is_alive():
print("join() must have timed out.")
else:
print("Our thread has terminated.")
由于多个线程可以访问相同的内存部分,有时可能会出现两个或多个线程尝试同时写入同一资源的情况, 或者输出取决于某些事件的顺序或时序。 这被称为 竞争条件 。当这种情况发生时, 输出将会出现乱码,或者可能会遇到难以调试的问题。 stackoverflow 上的一个文章 是个很好的例子。
一个比较好的解决方法是在每个线程写入共享资源之前获取 Lock 。 锁可以通过环境上下文协议 ( with
语句)或直接使用 acquire()
和 release()
来获取和释放。 以下是一个(颇有争议的)例子:
from threading import Lock, Thread
file_lock = Lock()
def log(msg):
with file_lock:
open('website_changes.log', 'w') as f:
f.write(changes)
def monitor_website(some_website):
"""
监控网站,如果发生变化就写 log 到硬盘上
"""
while True:
changes = check_for_changes(some_website)
if changes:
log(changes)
websites = ['http://google.com/', ... ]
for website in websites:
t = Thread(monitor_website, website)
t.start()
在这里,我们有一堆线程检查站点列表中的更改,每当有任何更改时,它们尝试通过调用 log(changes)
将这些更改写入文件。 当调用 log()
时,它在 with file_lock:
处等待获取锁。 这样可以确保在任何时候只有一个线程正在写入文件。