当前位置: 首页 > 工具软件 > eventlet > 使用案例 >

openstack基础之eventlet

田曜瑞
2023-12-01

eventlet是一个可以改变你代码运行方式的python并发网络库,同时不改变你编写代码的方式.

它使用高度可扩展的非阻塞I/O,epoll或者kqueue,libevent.

协程保证开发者使用类似线程的方式来编写非阻塞程序,同时保证了非阻塞I/O的好处.

内含事件驱动,意味着你可以很容易在python解释器中使用它,或者在大程序的某一部分使用它.

很容易开始使用eventlet,也容易将它应用于现有的程序.


举例

下面是一系列的使用eventlet的例子,它们就放在eventlet源码的example目录下.


网络爬虫

#!/usr/bin/env python
"""
This is a simple web "crawler" that fetches a bunch of urls using a pool to
control the number of outbound connections. It has as many simultaneously open
connections as coroutines in the pool.

The prints in the body of the fetch function are there to demonstrate that the
requests are truly made in parallel.
"""
import eventlet
from eventlet.green import urllib2


urls = [
    "https://www.google.com/intl/en_ALL/images/logo.gif",
    "http://python.org/images/python-logo.gif",
    "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif",
]


def fetch(url):
    print("opening", url)
    body = urllib2.urlopen(url).read()
    print("done with", url)
    return url, body


pool = eventlet.GreenPool(200)
for url, body in pool.imap(fetch, urls):
    print("got body from", url, "of length", len(body))

WSGI服务器

"""This is a simple example of running a wsgi application with eventlet.
For a more fully-featured server which supports multiple processes,
multiple threads, and graceful code reloading, see:

http://pypi.python.org/pypi/Spawning/
"""

import eventlet
from eventlet import wsgi


def hello_world(env, start_response):
    if env['PATH_INFO'] != '/':
        start_response('404 Not Found', [('Content-Type', 'text/plain')])
        return ['Not Found\r\n']
    start_response('200 OK', [('Content-Type', 'text/plain')])
    return ['Hello, World!\r\n']

wsgi.server(eventlet.listen(('', 8090)), hello_world)


Echo服务器

#! /usr/bin/env python
"""\
Simple server that listens on port 6000 and echos back every input to
the client.  To try out the server, start it up by running this file.

Connect to it with:
  telnet localhost 6000

You terminate your connection by terminating telnet (typically Ctrl-]
and then 'quit')
"""
from __future__ import print_function

import eventlet


def handle(fd):
    print("client connected")
    while True:
        # pass through every non-eof line
        x = fd.readline()
        if not x:
            break
        fd.write(x)
        fd.flush()
        print("echoed", x, end=' ')
    print("client disconnected")

print("server socket listening on port 6000")
server = eventlet.listen(('0.0.0.0', 6000))
pool = eventlet.GreenPool()
while True:
    try:
        new_sock, address = server.accept()
        print("accepted", address)
        pool.spawn_n(handle, new_sock.makefile('rw'))
    except (SystemExit, KeyboardInterrupt):
        break

Socket连接

"""Spawn multiple workers and collect their results.

Demonstrates how to use the eventlet.green.socket module.
"""
from __future__ import print_function

import eventlet
from eventlet.green import socket


def geturl(url):
    c = socket.socket()
    ip = socket.gethostbyname(url)
    c.connect((ip, 80))
    print('%s connected' % url)
    c.sendall('GET /\r\n\r\n')
    return c.recv(1024)


urls = ['www.google.com', 'www.yandex.ru', 'www.python.org']
pile = eventlet.GreenPile()
for x in urls:
    pile.spawn(geturl, x)

# note that the pile acts as a collection of return values from the functions
# if any exceptions are raised by the function they'll get raised here
for url, result in zip(urls, pile):
    print('%s: %s' % (url, repr(result)[:50]))

基本用法

Eventlet是围绕着green threads(也就是协程,green threads是eventlet使用的术语)这个概念构建的,它们被启动用于网络相关的工作.

green threads和普通的线程相比,主要在以下2方面不同:

  • green thread开销很小,你不需要像使用普通线程那样节约.一般情况下,一个网络连接至少由一个green thread来处理.
  • green thread间彼此协作,通过主动让出执行权让其它green thread运行,而不是通过调度器被动地被抢占执行.这样的主要好处是对共享数据的方法不需要加锁,因为只有当前运行的green thread通过yield显式地放弃运行后,另外一个green thread才能对共享数据进行访问.也可以通过检查队列等原语来查看是否有未处理的数据.


主要API

Eventlet API的设计目标是简单可靠.代码需要易读并且容易理解代码的用途.较少行的代码优于过于聪明取巧的实现.就像python一样,在Eventlet中应该有且只有一种方法来实现我们的目标.

尽管Eventlet有很多模块,最常用的使用Eventlet的方法是简单的"import eventlet".下面是Eventlet模块一些可用功能的快速总结,里面含有更详细的文档链接.


Greenthread Spawn

eventlet. spawn ( func*args**kw )

启动一个greenthread来调用fund函数.Spawn多个green threads将会并行地完成任务.spawn的返回值是一个greenthread.GreenThread对象, 可以通过它来检索函数fund的返回值.通过spawn来更详细地了解.

eventlet. spawn_n ( func*args**kw )

和spawn一样,但是不能知道函数是怎么终止的(是正确地返回了还是抛出了异常). spawn_n可以更快地执行.通过spawn_n来了解更多.

eventlet. spawn_after ( secondsfunc*args**kw )

在seconds秒后执行func,是spawn()函数的延迟版. 要想终止spawn并阻止fund被调用,可以在spawn_after()的返回值上调用GreenThread.cancel() . 通过spawn_after来了解更多.


Greenthread Control


eventlet. sleep ( seconds=0 )

挂起当前green thread允许其它green thread有机会运行. 通过sleep 了解更多.

class  eventlet. GreenPool

控制并发的池.在应用程序中想要使用有限的内存或者限制部分代码打开的连接数,或者想要对不可预知的数据输入提供统一的界面.GreenPool提供了这样的控制.通过 GreenPool来了解如何使用它.

class  eventlet. GreenPile

GreenPile对象代表了大块的工作. 本质上,一个GreenPile是一个被工作填充的迭代器,可以在稍后进行结果读取.c通过GreenPile来了解更多.

class  eventlet. Queue

Queues是数据和执行单元通信的基础设施.Eventlet的队列类是greenthreads间用来通信的,并且为此提供了很多有用的特性.查看Queue来了解更多.

class  eventlet. Timeout

这个类提供了一种方法来为任何代码的执行添加超时时间.在超时后会在当前greenthread中抛出异常. 如果忽略了异常参数或者传递None, 则Timeout实例会被抛出.

Timeout对象实现了context管理器,所以通过"with“表达式来使用.通过Timeout来了解更多细节.

Patching Functions

eventlet. import_patched ( modulename*additional_modules**kw_additional_modules )

确保标准库模块以"green"版本导入,这样所有事情都以非阻塞方式工作.唯一需要的参数是要导入的模块名称. 更多的信息请查看Import Green.

eventlet. monkey_patch ( all=Trueos=Falseselect=Falsesocket=Falsethread=Falsetime=False )

全局打补丁可以使必要的系统模块greenthread友好. 关键字参数对被打补丁的模块进行一些控制.

如果all是True,则会忽略所有其它参数并对所有的模块都打上补丁.如果是False,则标准库的特定子部分由剩余的关键字参数控制如何打补丁.大部分的单一模块补丁都有相同的名称(os,time,select).socket是个例外,同时会对ssl模块打补丁;还有thread会对thread,threading和Queue都打上补丁.多次调用monkey_patch是安全的.更多的细节请查看 Monkeypatching the Standard Library.

Network Convenience Functions

eventlet. connect ( addrfamily=<AddressFamily.AF_INET: 2>bind=None )

打开客户端sockets的便捷函数.

参数:
  • addr – 要连接的服务器地址.对于TCP sockets,是一个(host,port)元组.
  • family – Socket协议族, 可选的. 可用的地址协议族参考socket文档.
  • bind – 要绑定的本地端口, 可选的.
返回值:

连接成功的green socket对象.

eventlet. listen ( addrfamily=<AddressFamily.AF_INET: 2>backlog=50 )

打开服务端sockets的便捷函数. 这个socket可以被用在serve()或者典型的 accept()循环中.

在socket上设置SO_REUSEADDR 来避免麻烦.

Parameters:
  • addr – 监听的地址. 对于TCP sockets, 是一个(host, port) 元组.
  • family – Socket 协议族, 可选的. 可用的地址协议族参考socket文档.
  • backlog – 连接队列的最大值,最小为1; 最大值依赖于系统实现.
Returns:

正在监听的green socket对象.

eventlet. wrap_ssl ( sock*a**kw )

将普通的socket转换为SSL socket的便捷函数. 还有一个相同的接口ssl.wrap_socket(),但是也可以使用PyOpenSSL. Though,注意,它忽略了cert_reqsssl_versionca_certsdo_handshake_on_connect, 和suppress_ragged_eofs 参数当使用了PyOpenSSL.

通常的做法是直接在创建函数上使用wrap_ssl, 比如., wrap_ssl(connect(addr)) 或者 wrap_ssl(listen(addr),server_side=True). 使用这种方式就不会有未包装的socket无意中破坏SSL会话了.

:返回Green SSL 对象.

eventlet. serve ( sockhandleconcurrency=1000 )

在指定的socket上运行服务器.对每个客户端连接都运行一个单独的greenthread并调用handle函数处理. handle有2个参数: 客户端socket对象和客户端地址:

def myhandle(client_sock, client_addr):
    print("client connected", client_addr)

eventlet.serve(eventlet.listen(('127.0.0.1', 9999)), myhandle)

从handle返回并关闭客户端连接.

serve() 函数会阻塞当前的green thread;它在server运行完成之前不会返回. 如果你需要直接返回,spawn一个新的green thread来运行server().

handle中任何未捕获的异常都会从server()中抛出,并终止server,所以小心处理你的应用可以抛出的异常. handle的返回值会被忽略.

抛出StopServe 异常来优雅地终止server---这是唯一使server()函数返回而不抛出异常的方法.

concurrency 的值用于控制处理请求可以使用的greenthreads的最大数量. 如果server达到了这个上限,它将会停止accepting新的连接直到现有的某个请求处理完成为止.

class  eventlet. StopServe

用于优雅地终止server()的异常类.

这些就是Eventlet的基本原语;其它Eventlet模块中还有许多原语: Module Reference.


Design Patterns

Eventlet的使用由很多基本的模式构成.这里是一些展示它们基本结构的例子.

Client Pattern

典型的客户端例子是一个网络爬虫. 有一个需要下载的urls列表,并在后续处理中处理它们返回的body.下面是一个很简单的例子:

import eventlet
from eventlet.green import urllib2

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
       "https://www.python.org/static/img/python-logo.png",
       "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

def fetch(url):
    return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()
for body in pool.imap(fetch, urls):
    print("got body", len(body))

还有一个稍微复杂的爬虫版本web crawler example. 这个例子中有一些值得解释的代码.

from eventlet.green import urllib2 是你如何导入urllib2的yield协作版本. 它的使用和标准的版本看起来完全相同,除了它使用green sockets用于通信.这里是一个导入Green模式的例子.

pool = eventlet.GreenPool() 构造了一个由1000个green threads构成的GreenPool. 使用一个pool是一个好的习惯,因为它限制了爬虫同时进行工作的最大数量,当输入数据变化很大时这很有用.

for body in pool.imap(fetch, urls): 并行调用fetch函数并使用迭代器访问结果.  map使调用的函数并行执行,并以它们执行的顺序返回结果.

客户端编程模式的关键点是处理收集每个函数调用的结果; 每个fetch函数并行执行本质上是一种看不见的优化. 注意map是有内存上限的,如果urls的列表增长到几万,它不会消息几千M的内存(是的,我们曾经在生产环境中遇到这种总是!).

Server Pattern

这里是一个简单的服务端的例子,一个简单的ech服务器:

import eventlet

def handle(client):
    while True:
        c = client.recv(1)
        if not c: break
        client.sendall(c)

server = eventlet.listen(('0.0.0.0', 6000))
pool = eventlet.GreenPool(10000)
while True:
    new_sock, address = server.accept()
    pool.spawn_n(handle, new_sock)

这里有一个更健壮和复杂的版本echo server example.

server = eventlet.listen(('0.0.0.0', 6000))很方便地创建了一个监听socket.

pool = eventlet.GreenPool(10000) 创建了一个green threads池能处理一万个客户端.

pool.spawn_n(handle, new_sock) 启动一个green thread来处理新的客户端连接. accept循环并不关心handle函数的返回值,所以这里使用spawn_n而不是spawn.

server和client编程模式的不同点归结起来是:server有一个while循环来反复调用accept()然后将客户端socket完全交给handle()方法处理,而不是收集它们的结果.

Dispatch Pattern

一个常见的应用案例是Linden Lab始终以一个“dispatch” 设计模式在运行. 这是一个服务器同时也是一些其它服务的客户端. Proxies, aggregators, job workers, 等等都适用于这里. 这就是GreenPile设计使用的场景.

 这里有个例子: 一个server从客户端收到包含了RSS订阅的urls列表的POSTs. 然后server 并发地fetch所有的订阅然后向客户端回应一个标题的列表. 也很容易想象出更复杂的应用场景,可以很容易地将它改造成一个Reader-style 的应用:

import eventlet
feedparser = eventlet.import_patched('feedparser')

pool = eventlet.GreenPool()

def fetch_title(url):
    d = feedparser.parse(url)
    return d.feed.get('title', '')

def app(environ, start_response):
    pile = eventlet.GreenPile(pool)
    for url in environ['wsgi.input'].readlines():
        pile.spawn(fetch_title, url)
    titles = '\n'.join(pile)
    start_response('200 OK', [('Content-type', 'text/plain')])
    return [titles]

例子的完整版本在Feed Scraper, 里面包括了如果在指定端口启动WSGI服务器.

这个例子使用一个全局的GreenPool来控制并发度. 如果我们没有一个全局的限制来控制正在处理的请求数,客户端就能够引起server同时打开上万个到外部服务器的连接,这样就会引起抓取的IP被禁止和其它各种各样的恶意意图. 池不能完全地防止DoS攻击,但是能极大地减少DoS.

值得注意的是下面的代码:

1
2
3
4
pile = eventlet.GreenPile(pool)
for url in environ['wsgi.input'].readlines():
    pile.spawn(fetch_title, url)
titles = '\n'.join(pile)

注意第1行, Pile用全局的pool作为参数来构造. 这样就将Pile的并发度与全局绑定了. 如果已经有来自其它客户端的1000个fetchs, 它将会阻塞到有一些完成之后在处理. 增加限制是一个好习惯!

第3行只是调用spawn,但是不会存储任何它的返回值. 这是因为返回值被Pile自己保持着. 这点可以在下面的代码中很明显地看出来...

第4行我们实际使用Pile作为一个iterator. iterator中的每个元素是fetch_title的返回值(是strings).我们可以使用通常的python用法 (join()) 来将这些增量联系起来.


Greening The World

开发一个类似Eventlet这样的库的最大挑战是内置的网络库并非原生地支持我们需要的那些协作的yield. 我们必须要做的是对标准库的关键地方打补丁来使它们支持协作yield.我们曾经考虑在导入Eventlet时自动地做这些, 但是我们决定不做这些因为这是不pythonic的,导入了模块B却改变了模块A的行为.

这样,使用Eventlet的应用程序自己必须显式地green world.使用提供的一个或所有的便捷方式.

IImport Green

第一种greening应用程序的方法是从eventlet.green包中导入想着的网络库. 它包含了和标准库一样接口的库, 但是它们已经被修改成了能与green threads很好工作的模块. 使用这种方式是一个很好的工程实践,因为实际的依赖项在每个文件中都很明显:

from eventlet.green import socket
from eventlet.green import threading
from eventlet.green import asyncore

这种方式能够很好的工作如果每个库都能被green导入.但是eventlet.green 缺少一些模块(比如, 非标准的python模块), 这时候import_patched()函数就能够拯救你了. 它是内置import的一个替代可以导入时green任何模块.

eventlet.patcher. import_patched ( module_name*additional_modules**kw_additional_modules )

以green方式导入一个模块, 这样这个模块中使用网络库中socket的部分将会使用Eventlet的green版本. 唯一需要的参数是需要导入的模块名称:

import eventlet
httplib2 = eventlet.import_patched('httplib2')

在底层, 它暂时地将sys.modules中的标准版本替换为eventlet.green中的等价版本.当对导入的模块打完补丁后再恢复原来的sys.modules.这样,如果被打补丁的模块中含有'import socket‘就会被import_patched替换对eventlet.green.socket的引用.这种方式的弱点是不支持延迟绑定(导入发生在运行时). 通常很少做延迟绑定 (它很慢并且违反PEP-8), 所以在大多数情况下import_patched工作的很好.

import_patched的另一方面是能够准备地指定需要打补丁的模块. 这样可能会有轻微地性能提升因为只对需要的模块进行导入, 而没有参数的import_patched会导入一批模块以防止需要它们. additional_modules 和 kw_additional_modules 参数都是name/module对的序列. 可以单独或同时使用它们:

from eventlet.green import socket
from eventlet.green import SocketServer
BaseHTTPServer = eventlet.import_patched('BaseHTTPServer',
                        ('socket', socket),
                        ('SocketServer', SocketServer))
BaseHTTPServer = eventlet.import_patched('BaseHTTPServer',
                        socket=socket, SocketServer=SocketServer)

Monkeypatching the Standard Library

另一种greening应用程序的方法是简单地对标准库打猴子补丁. 这会看起来像变魔法一样,但是能避免延迟绑定的问题.

eventlet.patcher. monkey_patch ( os=Noneselect=Nonesocket=Nonethread=Nonetime=Nonepsycopg=None )

这个函数通过替换关键系统模块中的关键元素来打猴子补丁. 如果不指定参数,所有的东西都会被打补丁:

import eventlet
eventlet.monkey_patch()

关键字参数用来控制模块如何被打补丁, 通常这很重要. 大部分名字会对单一模块打补丁 (比如. time=True意味着time模块被打上了补丁[time.sleep被eventlet.sleep打补丁]). 这个规则的例外是socket, 同时会对ssl模块打补丁; 还有thread, 会对threadthreading,和Queue打补丁.

下面是一个只对部分模块打猴子补丁的例子:

import eventlet
eventlet.monkey_patch(socket=True, select=True)

在应用程序中尽可能早地调用monkey_patch()是很重要的. 最好在main模块的第一行做. 这样做的原因是:有时候继承自另外一个类的类需要被greened– 比如一个类继承自socket.socket – 继承在导入模块时完成, 所以猴子补丁操作需要在继承定义之前完成. 多次调用 monkey_patch是安全的.

psycopg猴子补丁依赖Daniele Carrazzo的 green psycopg2 分支; 更多地信息请参考the announcement.

eventlet.patcher. is_monkey_patched ( module )

返回指定的模块是否已经打上了猴子补丁. module参数可以是模块对象或者模块的名字.

完全基于模块的名称,所以如果你用import关键字以外的其它方法导入一个模块(包括  import_patched()), is_monkey_patched可能对指定的模块返回不正确的结果.

 类似资料: