当前位置: 首页 > 文档资料 > Tornado 用户手册 >

协程

优质
小牛编辑
136浏览
2023-12-01

Tornado 中推荐用 协程 来编写异步代码. 协程使用 Python 中的关键字 yield 来替代链式回调来实现挂起和继续程序的执行(像在 gevent 中使用的轻量级线程合作的方法有时也称作协程, 但是在 Tornado 中所有协程使用异步函数来实现的明确的上下文切换).

协程和异步编程的代码一样简单, 而且不用浪费额外的线程, . 它们还可以减少上下文切换 让并发更简单 .

Example:

from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    # 在 Python 3.3 之前的版本中, 从生成器函数
    # 返回一个值是不允许的,你必须用
    #   raise gen.Return(response.body)
    # 来代替
    return response.body

Python 3.5: asyncawait

Python 3.5 引入了 asyncawait 关键字 (使用了这些关键字的函数通常被叫做 “native coroutines” ). 从 Tornado 4.3 开始, 在协程基础上你可以使用这些来代替 yield. 简单的通过使用 async def foo() 来代替 @gen.coroutine 装饰器, 用 await 来代替 yield. 文档的剩余部分还是使用 yield 来兼容旧版本的 Python, 但是 asyncawait 在可用时将会运行的更快:

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

await 关键字并不像 yield 更加通用. 例如, 在一个基于 yield 的协程中你可以生成一个列表的 Futures, 但是在原生的协程中你必须给列表报装 . 你也可以使用 将使用 yield 的任何东西转换成用 await 工作的形式.

虽然原生的协程不依赖于某种特定的框架 (例如. 它并没有使用像 或者 装饰器), 不是所有的协程都和其它程序兼容.这里有一个 协程运行器 在第一个协程被调用时进行选择, 然后被所有直接调用 await 的协程库共享. Tornado 协程运行器设计时就时多用途且可以接受任何框架的 awaitable 对象. 其它协程运行器可能会有更多的限制(例如, asyncio 协程运行器不能接收其它框架的协程). 由于这个原因, 我们推荐你使用 Tornado 的协程运行器来兼容任何框架的协程. 在 Tornado 协程运行器中调用一个已经用了asyncio协程运行器的协程,只需要用 tornado.platform.asyncio.to_asyncio_future 适配器.

他是如何工作的

一个含有 yield 的函数时一个 生成器 . 所有生成器都是异步的; 调用它时将会返回一个对象而不是将函数运行完成. @gen.coroutine 修饰器通过 yield 表达式通过产生一个 对象和生成器进行通信.

这是一个协程装饰器内部循环的额简单版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

装饰器从生成器接收一个 对象, 等待 (非阻塞的) 完成, 然后 “解开” 将结果像 yield 语句一样返回给生成器. 大多数异步代码从不直接接触到 类, 除非 立即通过异步函数返回给 yield 表达式.

怎样调用协程

协程在一般情况下不抛出异常: 在 被生成时将会把异常报装进来. 这意味着正确的调用协程十分的重要, 否则你可能忽略很多错误:

@gen.coroutine
def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

近乎所有情况中, 任何一个调用协程自身的函数必须时协程, 通过利用关键字 yield 来调用. 当你在覆盖了父类中的方法, 请查阅文档来判断协程是否被支持 ( 文档中应该写到那个方法 “可能是一个协程” 或者 “可能返回一个 ”):

@gen.coroutine
def good_call():
    # yield will unwrap the Future returned by divide() and raise
    # the exception.
    yield divide(1, 0)

有时你并不想等待一个协程的返回值. 在这种情况下我们推荐你使用 , 这意味着 负责调用. 如果它失败了, 会在日志中记录调用栈:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)
最后, 在程序的最顶层, 如果 `.IOLoop` 没有正在运行, 你可以启动 , 运行协程, 然后通过

方法来停止 . 这通常被用来启动面向批处理程序的 main 函数:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

协程模式

结合 callbacks

为了使用回调来代替 与异步代码进行交互, 讲这个调用报装在 中. 这将会在你生成的 对象中添加一个回调参数:

@gen.coroutine
def call_task():
    # Note that there are no parens on some_function.
    # This will be translated by Task into
    #   some_function(other_args, callback=callback)
    yield gen.Task(some_function, other_args)

调用阻塞函数

在协程中调用阻塞函数的最简单方法时通过使用 , 这将返回与协程兼容的 Futures

thread_pool = ThreadPoolExecutor(4)

@gen.coroutine
def call_blocking():
    yield thread_pool.submit(blocking_func, args)

并行

协程装饰器能识别列表或者字典中的 Futures ,并且并行等待这些 Futures:

@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url)
                        for url in urls}
    # responses is a dict {url: HTTPResponse}

交叉存取技术

有时保存一个 比立刻yield它更有用, 你可以在等待它之前执行其他操作:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

循环

因为在Python中无法使用 for 或者 while 循环 yield 迭代器, 并且捕获yield的返回结果. 相反, 你需要将循环和访问结果区分开来, 这是一个 Motor 的例子:

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在后台运行

和通常的协程不同. 相反, 协程中 通过使用 可以包含 while True: 循环:

@gen.coroutine
def minute_loop():
    while True:
        yield do_something()
        yield gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有时可能会遇到一些复杂的循环. 例如, 上一个循环每 60+N 秒运行一次, 其中 Ndo_something() 的耗时.为了精确运行 60 秒,使用上面的交叉模式:

@gen.coroutine
def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        yield do_something()  # Run while the clock is ticking.
        yield nxt             # Wait for the timer to run out.