基本介绍
一个基于greenlet的并发网络库。有了gevent后,不必向greenlet那样手动切换,而是当一个协程阻塞时,将自动切换到其他协程
import gevent
def test1():
print('test1.start')
gevent.sleep(0)
print('test1.end')
def test2():
print('test2.start')
gevent.sleep(0)
print('test2.end')
gevent.joinall([
gevent.spawn(test1),
gevent.spawn(test2)
])
补丁
python的socket库是阻塞式的,DNS无法并发解析,所以需要通过一定方法来解决这个问题
from gevent import monkey
monkey.patch_all()
monkey.patch_all()将把python中多有的标准库都替换成非阻塞时的
获取协程状态
started:判断协程是否已经启动
ready():判断协程是否已经停止
sucessful():判断协程是否已经成功运行并且没有抛出异常
value属性:如果运行成功并且有返回值,那么可以通过value获取
exception:在geenlet协程运行过程中抛出异常是不会抛出到协程外的,因此需要主动捕获异常
import gevent
def win():
return 'You Win'
def fail():
raise Exception('You Fail')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print('winner.started =',winner.started)
print('loser.started =',loser.started)
try:
gevent.joinall([
winner,
loser
])
except Exception as e:
print(e.args)
print('winner.ready() =',winner.ready())
print('loser.ready() =',loser.ready())
print('winner.value =',winner.value)
print('loser.value =',loser.value)
print('winner.successful() =',winner.successful())
print('loser.successful() =',loser.successful())
print('loser.exception =',loser.exception)
try:
print(loser.get())
except:
print('exception from loser')
协程超时运行
可以再geven.joinall()函数传入timeout参数来设置超时时间,也可以设置全局超时时间:
import gevent
from gevent import Timeout
timeout = Timeout(2)
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
可以设置在with语句内,那么设置就只在with内有效
with Timeout(2):
gevent.sleep(10)
可以设置抛出的异常类型
with Timeout(2,Exception):
gevent.sleep(10)
协程间通信
wait():可以阻塞协程
set():可以唤醒所有阻塞的异常
import gevent
from gevent.event import Event
evt = Event()
def setter():
print('Wait for me')
gevent.sleep(3)
print('After 3 sec, I am done')
evt.set()
def waiter():
print('I am waiting')
evt.wait()
print('Finish waiting')
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
当调用Event.wait()函数时,协程会进入阻塞状态,直到调用set()函数后被唤醒。除了Event事件外,AsyncResult也有该功能,并且能在唤醒时传递数据
import gevent
from gevent.event import AsyncResult
aevt = AsyncResult()
def setter():
print('Waitint for me')
gevent.sleep(3)
print('After 3 sec, I am done')
aevt.set(value='Get Up')
def waiter():
print('I am waiting')
value = aevt.get()
print('I get a value:',value,id(gevent.getcurrent()))
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter).spawn(waiter)
])
信号量
acquire():获取信号量
release():释放信号量
当所有的信号都被获取后,剩下的协程只能等待某个释放才能继续执行
import gevent
from gevent.lock import BoundedSemaphore
sem = BoundedSemaphore(2)
def worker(r):
sem.acquire()
print(f'Worker {r} acquire a sem')
gevent.sleep(2)
sem.release()
print(f'Worker {r} release a sem')
gevent.joinall([
gevent.spawn(worker,i) for i in range(6)
])
协程本地变量
将变量放在local()中,即可将其作用域限制在该协程内,当其他协程想要访问另一个协程的变量时就会报错
import gevent
from gevent.local import local
data = local()
def f1():
data.x = 'f1'
print(data.x)
def f2():
try:
print(data.x)
except:
print('f2 has not data.x')
gevent.joinall([
gevent.spawn(f1).spawn(f2)
])
队列
队列是一个排序的数据集合,他有常见的put/get操作,但是它是一再Greenlet之间可以安全操作的方式来实现的。如果一个Greenlet从队列中取出一项,那么此项就不会被同时执行的其它Greenlet再取到。
import gevent
from gevent.queue import Queue
tasks = Queue()
def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
print('Quitting time!')
def boss():
for i in xrange(1,25):
tasks.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
首先初始化一个Queue()实例。这里会先运行boss() 调用put_nowait()方法不阻塞的往队列里面放24个元素。然后下面依次从Queue里对数字进行消费,起了三个协程分别命名了不同的名字,使用get方法依次消费队列里面的数字直到消费完毕。
put和get操作都有非阻塞的版本,put_nowait和get_nowait不会阻塞, 然而在操作不能完成时抛出gevent.queue.Empty或gevent.queue.Full异常。同时Queue队列可以支持设置最大队列值,查看队列现在元素数量qsize(),队列是否为空empty(),队列是否满了full()等api在使用的时候最好也参考一下文档:参考文档
Group/Pool
组是一个运行中greenlet的集合,集合中的greenlet像一个组一样,会被共同管理和调度。它也兼饰了像Python的multiprocessing库那样的平和调度器的角色。
基本使用
import gevent
from gevent.pool import Group
def talk(msg):
for i in xrange(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
group = Group()
group.add(g1)
group.add(g2)
group.join()
map
from gevent import getcurrent
from gevent.pool import Group
group = Group()
def hello_from(n):
print('Size of group %s' % len(group))
print('Hello from Greenlet %s' % id(getcurrent()))
return n
x = group.map(hello_from, xrange(3))
print type(x)
print x
这里使用了group.map()这个函数来取得各spawn的返回值。map()是由第二个参数控制迭代次数,并且传递给第一个参数值而运行的。拿这个函数举例,这里会返回一个list构成这个list的对象就是将迭代的参数传进函数运行之后的返回值。这里得到的结果是[0, 1, 2]
imap_unordered
import gevent
from gevent.pool import Group
def intensive(n):
gevent.sleep(3 - n)
return 'task', n
igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
print(i)