当前位置: 首页 > 工具软件 > gevent > 使用案例 >

gevent学习介绍

丌官浩旷
2023-12-01

gevent


  1. 基本介绍
    一个基于greenlet的并发网络库。有了gevent后,不必向greenlet那样手动切换,而是当一个协程阻塞时,将自动切换到其他协程

    import gevent
    
    def test1():
        print('test1.start')
        gevent.sleep(0)
        print('test1.end')
    
    def test2():
        print('test2.start')
        gevent.sleep(0)
        print('test2.end')
    
    gevent.joinall([
            gevent.spawn(test1),
            gevent.spawn(test2)
        ])
    
  • gevent.spawn():创建一个协程并启动
  • gevent.sleep():会阻塞切换到其他协程继续运行
  • gevent.joinall([]):会等待所有传入的协程运行(与启动不同)结束后退出
  1. 补丁

    python的socket库是阻塞式的,DNS无法并发解析,所以需要通过一定方法来解决这个问题

    from gevent import monkey
    monkey.patch_all()
    

    monkey.patch_all()将把python中多有的标准库都替换成非阻塞时的

  2. 获取协程状态
    started:判断协程是否已经启动
    ready():判断协程是否已经停止
    sucessful():判断协程是否已经成功运行并且没有抛出异常
    value属性:如果运行成功并且有返回值,那么可以通过value获取
    exception:在geenlet协程运行过程中抛出异常是不会抛出到协程外的,因此需要主动捕获异常

    import gevent
    
    def win():
        return 'You Win'
    
    def fail():
        raise Exception('You Fail')
    
    
    winner = gevent.spawn(win)
    loser = gevent.spawn(fail)
    
    print('winner.started =',winner.started)
    print('loser.started =',loser.started)
    
    
    try:
        gevent.joinall([
                winner,
                loser
            ])
    except Exception as e:
        print(e.args)
    
    print('winner.ready() =',winner.ready())
    print('loser.ready() =',loser.ready())
    
    print('winner.value =',winner.value)
    print('loser.value =',loser.value)
    
    print('winner.successful() =',winner.successful())
    print('loser.successful() =',loser.successful())
    
    
    print('loser.exception =',loser.exception)
    
    try:
        print(loser.get())
    except:
        print('exception from loser')
    
  3. 协程超时运行

  • 可以再geven.joinall()函数传入timeout参数来设置超时时间,也可以设置全局超时时间:

    import gevent
    from gevent import Timeout
    
    timeout = Timeout(2)
    timeout.start()
    
    def wait():
        gevent.sleep(10)
    
    try:
        gevent.spawn(wait).join()
    except Timeout:
        print('Could not complete')
    
  • 可以设置在with语句内,那么设置就只在with内有效

    with Timeout(2):
        gevent.sleep(10)
    
  • 可以设置抛出的异常类型

    with Timeout(2,Exception):
        gevent.sleep(10)
    
  1. 协程间通信

    wait():可以阻塞协程
    set():可以唤醒所有阻塞的异常

    import gevent
    from gevent.event import Event
    
    evt = Event()
    
    def setter():
        print('Wait for me')
        gevent.sleep(3)
        print('After 3 sec, I am done')
        evt.set()
    
    def waiter():
        print('I am waiting')
        evt.wait()
        print('Finish waiting')
    
    gevent.joinall([
            gevent.spawn(setter),
            gevent.spawn(waiter),
            gevent.spawn(waiter),
            gevent.spawn(waiter),
            gevent.spawn(waiter),
            gevent.spawn(waiter)
        ])
    

    当调用Event.wait()函数时,协程会进入阻塞状态,直到调用set()函数后被唤醒。除了Event事件外,AsyncResult也有该功能,并且能在唤醒时传递数据

    import gevent
    from gevent.event import AsyncResult
    
    aevt = AsyncResult()
    
    def setter():
        print('Waitint for me')
        gevent.sleep(3)
        print('After 3 sec, I am done')
        aevt.set(value='Get Up')
    
    def waiter():
        print('I am waiting')
        value = aevt.get()
        print('I get a value:',value,id(gevent.getcurrent()))
    
    gevent.joinall([
            gevent.spawn(setter),
            gevent.spawn(waiter).spawn(waiter)
        ])
    
  2. 信号量
    acquire():获取信号量
    release():释放信号量
    当所有的信号都被获取后,剩下的协程只能等待某个释放才能继续执行

    import gevent
    from gevent.lock import BoundedSemaphore
    
    sem = BoundedSemaphore(2)
    
    def worker(r):
        sem.acquire()
        print(f'Worker {r} acquire a sem')
        gevent.sleep(2)
        sem.release()
        print(f'Worker {r} release a sem')
    
    gevent.joinall([
            gevent.spawn(worker,i) for i in range(6)
        ])
    
  3. 协程本地变量
    将变量放在local()中,即可将其作用域限制在该协程内,当其他协程想要访问另一个协程的变量时就会报错

    import gevent
    from gevent.local import local
    
    data = local()
    
    def f1():
        data.x = 'f1'
        print(data.x)
    
    def f2():
        try:
            print(data.x)
        except:
            print('f2 has not data.x')
    
    gevent.joinall([
            gevent.spawn(f1).spawn(f2)
        ])
    
  4. 队列

    队列是一个排序的数据集合,他有常见的put/get操作,但是它是一再Greenlet之间可以安全操作的方式来实现的。如果一个Greenlet从队列中取出一项,那么此项就不会被同时执行的其它Greenlet再取到。

    import gevent
    from gevent.queue import Queue
    
    tasks = Queue()
    
    def worker(n):
        while not tasks.empty():
            task = tasks.get()
            print('Worker %s got task %s' % (n, task))
            gevent.sleep(0)
    
        print('Quitting time!')
    
    def boss():
        for i in xrange(1,25):
            tasks.put_nowait(i)
    
    gevent.spawn(boss).join()
    
    gevent.joinall([
        gevent.spawn(worker, 'steve'),
        gevent.spawn(worker, 'john'),
        gevent.spawn(worker, 'nancy'),
    ])
    

    首先初始化一个Queue()实例。这里会先运行boss() 调用put_nowait()方法不阻塞的往队列里面放24个元素。然后下面依次从Queue里对数字进行消费,起了三个协程分别命名了不同的名字,使用get方法依次消费队列里面的数字直到消费完毕。

    put和get操作都有非阻塞的版本,put_nowait和get_nowait不会阻塞, 然而在操作不能完成时抛出gevent.queue.Empty或gevent.queue.Full异常。同时Queue队列可以支持设置最大队列值,查看队列现在元素数量qsize(),队列是否为空empty(),队列是否满了full()等api在使用的时候最好也参考一下文档:参考文档

  5. Group/Pool

    组是一个运行中greenlet的集合,集合中的greenlet像一个组一样,会被共同管理和调度。它也兼饰了像Python的multiprocessing库那样的平和调度器的角色。

  • 基本使用

    import gevent
    from gevent.pool import Group
    
    def talk(msg):
        for i in xrange(3):
            print(msg)
    
    g1 = gevent.spawn(talk, 'bar')
    g2 = gevent.spawn(talk, 'foo')
    
    group = Group()
    group.add(g1)
    group.add(g2)
    group.join()
    
  • map

    from gevent import getcurrent
    from gevent.pool import Group
    
    group = Group()
    
    
    def hello_from(n):
        print('Size of group %s' % len(group))
        print('Hello from Greenlet %s' % id(getcurrent()))
        return n
    
    x = group.map(hello_from, xrange(3))
    print type(x)
    print x
    

    这里使用了group.map()这个函数来取得各spawn的返回值。map()是由第二个参数控制迭代次数,并且传递给第一个参数值而运行的。拿这个函数举例,这里会返回一个list构成这个list的对象就是将迭代的参数传进函数运行之后的返回值。这里得到的结果是[0, 1, 2]

  • imap_unordered

    import gevent
    from gevent.pool import Group
    
    def intensive(n):
        gevent.sleep(3 - n)
        return 'task', n
    
    
    igroup = Group()
    for i in igroup.imap_unordered(intensive, xrange(3)):
        print(i)
    
 类似资料: