tornado 源码自带了丰富的 demo ,这篇文章主要分析 demo 中的聊天室应用: chatdemo
首先看 chatdemo 的目录结构:
├── chatdemo.py
├── static
│ ├── chat.css
│ └── chat.js
└── templates
├── index.html
├── message.html
└── room.html
非常简单,基本没有分层,三个模版一个 js 一个 css ,还有一个最重要的 chatdemo.py
本文的重点是弄清楚 chatdemo.py 的运行流程,所以对于此项目的其他文件,包括模版及 chat.js 的实现都不会分析,只要知道 chat.js 的工作流程相信对于理解 chatdemo.py 没有任何问题
此 demo 主要基于长轮询。 获取新消息的原理:
在 chat.js 中有一个定时器会定时执行 update 操作
当没有新消息时 tornado 会一直 hold 住 chat.js 发来的 update 请求
当有新消息时 tornado 将包含新消息的数据返回给所有 hold 的 update 请求
此时 chat.js 收到 update 回复后更新返回数据在聊天室中,同时再进行一次 update 请求, 然后又从 1. 开始执行。
发送新消息的原理:
输入消息, 点击 post 按钮, chat.js 获取表单后用 ajax 方式发送请求 new
tornado 收到请求 new ,返回消息本身, 同时通知所有 hold 住的 update 请求 ( 这里也包括发送 new 请求的 chat.js 所发送的 update 请求 ) 返回新消息
所有在线的 chat.js 收到 update 请求回复,更新返回信息到聊天室,同时再进行一次 update 请求。
清楚了以上流程,我们直接来看 chatdemo.py :
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
main()
main 函数主要用作初始化应用、监听端口以及启动 tornado server 。
我们看路由:
主页对应 MainHandler
new 请求对应 MessageNewHandler
updates 请求对应 MessageUpdatesHandler
下面来看 MainHandler :
# Making this a non-singleton is left as an exercise for the reader.
global_message_buffer = MessageBuffer()
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html", messages=global_message_buffer.cache)
只有一行代码,就是渲染并返回 index.html,渲染的附加信息就是 global_message_buffer 的所有缓存消息。 global_message_buffer 是 MessageBuffer 的一个实例。 我们先不关心 MessageBuffer 内部是什么,现在我们只要记住它主要是用来储存聊天消息和连接到此聊天室的人的信息的类。 其中 MessageBuffer().cache 就是保存聊天室所有聊天消息的结构。
然后来看 MessageNewHandler :
class MessageNewHandler(tornado.web.RequestHandler):
def post(self):
message = {
"id": str(uuid.uuid4()),
"body": self.get_argument("body"),
}
# to_basestring is necessary for Python 3's json encoder,
# which doesn't accept byte strings.
message["html"] = tornado.escape.to_basestring(
self.render_string("message.html", message=message))
if self.get_argument("next", None):
self.redirect(self.get_argument("next"))
else:
self.write(message)
global_message_buffer.new_messages([message])
同样很简单,从 post 信息里获取发来的新消息 ( body ) ,然后给消息分配一个唯一的 uuid,接着把这段消息渲染成一段 html ,然后 self.write(message)
返回这段 html, 同时给 global_message_buffer ( MessageBuffer 的实例 ) 添加这条新信息。 这里其实我更倾向于返回 json 之类的数据,这样更加直观和规范,可能写 demo 的人考虑到读者对 json 之类的协议可能不熟悉故而选择了返回渲染好的 html 直接让 chat.js append 到 index.html 里。
接着来看 MessageUpdatesHandler :
class MessageUpdatesHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self):
cursor = self.get_argument("cursor", None)
# Save the future returned by wait_for_messages so we can cancel
# it in wait_for_messages
self.future = global_message_buffer.wait_for_messages(cursor=cursor)
messages = yield self.future
if self.request.connection.stream.closed():
return
self.write(dict(messages=messages))
def on_connection_close(self):
global_message_buffer.cancel_wait(self.future)
重点就在这里,可以看到其内部的 post 方法被 gen.coroutine 修饰器修饰,也就是说这个 post 方法现在是 协程 ( coroutine ) 方式工作。 对于协程比较陌生的童鞋,你可以直接把它当作是单线程解决 io ( 网络请求 ) 密集运算被阻塞而导致低效率的解决方案。 当然这样理解协程还比较笼统,之后我会详细写一篇关于协程的文章,但在这里这样理解是没有问题的。
现在来看代码内容,首先获取 cursor ,一个用来标识我们已经获取的消息的指针,这样 tornado 就不会把你已经获取的消息重复的发给你。 然后调用 global_message_buffer.wait_for_messages(cursor=cursor)
获取一个 future 对象。 future 对象是 tornado 实现的一个特殊的类的实例,它的作用就是包含之后 ( 未来 ) 将会返回的数据,我们现在不用关心 Future() 内部如何实现,只要记住上面它的作用就行。 关于 Future 的解读我会放到阅读 Future 源码时讲。
然后看最关键的这句: messages = yield self.future
注意这里的 yield 就是 hold updates 请求的关键,它到这里相当于暂停了整个 post 函数 ( updates 请求被 hold )同时也相当于 updates 这次网络请求被阻塞,这个时候协程发挥作用,把这个函数暂停的地方的所有信息保存挂起,然后把工作线程释放,这样 tornado 可以继续接受 new、 updates 等请求然后运行相应的方法处理请求。
当有新的消息返回时,tornado 底层的 ioloop 实例将会调用 gen.send(value) 返回新消息( value )给每个被暂停的方法的 yield 处, 此时协程依次恢复这些被暂停的方法, 同时用获得的返回消息继续执行方法, 这时 messages = yield self.future
继续执行,messages 获得 yield 的返回值 value ( python 中调用 gen.send(value) 将会把 value 值返回到 yield 处并替换原前 yield 后的值 ),然后判断下用户是否已经离开,如果还在线则返回新消息。
明白了以上流程,我们最后来看 MessageBuffer:
class MessageBuffer(object):
def __init__(self):
self.waiters = set()
self.cache = []
self.cache_size = 200
def wait_for_messages(self, cursor=None):
# Construct a Future to return to our caller. This allows
# wait_for_messages to be yielded from a coroutine even though
# it is not a coroutine itself. We will set the result of the
# Future when results are available.
result_future = Future()
if cursor:
new_count = 0
for msg in reversed(self.cache):
if msg["id"] == cursor:
break
new_count += 1
if new_count:
result_future.set_result(self.cache[-new_count:])
return result_future
self.waiters.add(result_future)
return result_future
def cancel_wait(self, future):
self.waiters.remove(future)
# Set an empty result to unblock any coroutines waiting.
future.set_result([])
def new_messages(self, messages):
logging.info("Sending new message to %r listeners", len(self.waiters))
for future in self.waiters:
future.set_result(messages)
self.waiters = set()
self.cache.extend(messages)
if len(self.cache) > self.cache_size:
self.cache = self.cache[-self.cache_size:]
初始化方法中 self.waiters 就是一个等待新消息的 listener 集合 ( 直接理解成所有被 hold 住的 updates 请求队列可能更清晰 )
self.cache 就是储存所有聊天消息的列表,self.cache_size = 200
则定义了 cache 的大小 是存 200 条消息。
然后先来看简单的 new_messages:
遍历 waiters 列表,然后给所有的等待者返回新消息,同时清空等待者队列。 然后把消息加到缓存 cache 里,如果缓存大于限制则取最新的 200 条消息。这里只要注意到 future.set_result(messages)
就是用来给 future 对象添加返回数据 ( 之前被 yield 暂停的地方此时因为 set_result() 方法将会获得 "未来" 的数据 ) 这一点即可。
然后来看 wait_for_messages :
def wait_for_messages(self, cursor=None):
# Construct a Future to return to our caller. This allows
# wait_for_messages to be yielded from a coroutine even though
# it is not a coroutine itself. We will set the result of the
# Future when results are available.
result_future = Future()
if cursor:
new_count = 0
for msg in reversed(self.cache):
if msg["id"] == cursor:
break
new_count += 1
if new_count:
result_future.set_result(self.cache[-new_count:])
return result_future
self.waiters.add(result_future)
return result_future
首先初始化一个 Future 对象,然后根据 cursor 判断哪些消息已经获取了哪些还没获取,如果缓存中有对于这个 waiter 还没获取过的消息,则直接调用 set_result() 返回这些缓存中已有的但对于这个 waiter 来说是新的的数据。 如果这个 waiter 已经有缓存中的所有数据,那么就把它加到等待者队列里保持等待,直到有新消息来时调用 new_messages 再返回。
而最后一个 cancel_wait 就很简单了,当有用户退出聊天室时,直接从 self.waiters 中移除他所对应的等待者。
当明白了整个代码的运行流程后,我们可以基于这个简单的 demo 而写出更加丰富的例子,比如加入 session ,做登陆、做好友关系,做单聊做群聊等等。
chatdemo with room是我添加的一个简单功能,输入聊天室房号后再进行聊天,只有同一房间中的人才能收到彼此的消息。
以上就是鄙人对整个 chatdemo.py 的解读。 在阅读此 demo 时,我没有参考其他源码解读,只是通过阅读 tornado 底层的源码而得出的个人的理解,因此肯定会有很多理解不成熟甚至错误的地方,还望大家多多指教。
作者:rapospectre