Tokio教程之分帧
我们现在将应用我们刚刚学到的关于I/O的知识,实现 Mini-Redis 的分帧层。成帧是将一个字节流转换为一个分帧流的过程。一个帧是两个对等体之间传输的数据单位。Redis协议的帧定义如下。
use bytes::Bytes;
enum Frame {
Simple(String),
Error(String),
Integer(u64),
Bulk(Bytes),
Null,
Array(Vec<Frame>),
}
Copy
请注意,该分帧只由数据组成,没有任何语义。命令的解析和执行发生在更高的层次。
对于HTTP,分帧可能看起来像:
enum HttpFrame {
RequestHead {
method: Method,
uri: Uri,
version: Version,
headers: HeaderMap,
},
ResponseHead {
status: StatusCode,
version: Version,
headers: HeaderMap,
},
BodyChunk {
chunk: Bytes,
},
}
Copy
为了实现 Mini-Redis 的分帧,我们将实现一个 Connection 结构,它包裹着一个TcpStream并读写 mini_redis::Frame
值。
use tokio::net::TcpStream;
use mini_redis::{Frame, Result};
struct Connection {
stream: TcpStream,
// ... other fields here
}
impl Connection {
/// Read a frame from the connection.
///
/// Returns `None` if EOF is reached
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
// implementation here
}
/// Write a frame to the connection.
pub async fn write_frame(&mut self, frame: &Frame)
-> Result<()>
{
// implementation here
}
}
Copy
read_frame 方法在返回之前会等待一整帧的接收。对 TcpStream::read() 的一次调用可以返回一个任意数量的数据。它可能包含一整个帧,一个部分帧,或者多个帧。如果收到一个部分帧,数据被缓冲,并从套接字中读取更多数据。如果收到多个帧,则返回第一个帧,其余的数据被缓冲,直到下次调用 read_frame。
为了实现这一点,Connection需要一个读取缓冲区字段。数据从套接字中读入读取缓冲区。当一个帧被解析后,相应的数据就会从缓冲区中移除。
我们将使用BytesMut作为缓冲区类型。这是Bytes的一个可变版本。
use bytes::BytesMut;
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
// Allocate the buffer with 4kb of capacity.
buffer: BytesMut::with_capacity(4096),
}
}
}
Copy
以及Connection上的read_frame()函数:
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self)
-> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// Ensure the buffer has capacity
if self.buffer.len() == self.cursor {
// Grow the buffer
self.buffer.resize(self.cursor * 2, 0);
}
// Read into the buffer, tracking the number
// of bytes read
let n = self.stream.read(
&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
// Update our cursor
self.cursor += n;
}
}
}
Copy
在处理字节数组和 read 时,我们还必须维护一个游标,跟踪有多少数据被缓冲。我们必须确保将缓冲区的空部分传递给 read()。否则,我们会覆盖缓冲区的数据。如果我们的缓冲区被填满了,我们必须增加缓冲区,以便继续读取。在 parse_frame() 中(不包括),我们需要解析 self.buffer[..self.cursor]
所包含的数据。
因为将字节数组与游标配对是非常常见的,bytes
crate提供了一个代表字节数组和游标的抽象概念。Buf
特性是由可以读取数据的类型实现的。BufMut
特性是由可以写入数据的类型实现的。当把一个 T: BufMut
传递给 read_buf()
时,缓冲区的内部游标会被 read_buf
自动更新。正因为如此,在我们版本的read_frame中,我们不需要管理我们自己的游标。
此外,当使用 Vec<u8>
时,缓冲区必须被初始化。 vec![0; 4096]
分配了一个4096字节的数组,并将0写入每个条目。当调整缓冲区的大小时,新的容量也必须用零来初始化。初始化过程是不没有开销的。当使用 BytesMut
和 BufMut
时,容量是不初始化的。BytesMut
的抽象防止我们读取未初始化的内存。这让我们避免了初始化的步骤。
现在,让我们看一下parse_frame()函数。解析工作分两步进行。
mini-redis crate 为我们提供了一个用于这两个步骤的函数:
Frame::check
2Frame::parse
我们还将重用 Buf
的抽象来帮助。 Buf
被传入 Frame::check
。当检查函数遍历传入的缓冲区时,内部游标将被推进。当check返回时,缓冲区的内部游标指向帧的末端。
对于Buf类型,我们将使用 std::io::Cursor<&[u8]>
。
use mini_redis::{Frame, Result};
use mini_redis::frame::Error::Incomplete;
use bytes::Buf;
use std::io::Cursor;
fn parse_frame(&mut self)
-> Result<Option<Frame>>
{
// Create the `T: Buf` type.
let mut buf = Cursor::new(&self.buffer[..]);
// Check whether a full frame is available
match Frame::check(&mut buf) {
Ok(_) => {
// Get the byte length of the frame
let len = buf.position() as usize;
// Reset the internal cursor for the
// call to `parse`.
buf.set_position(0);
// Parse the frame
let frame = Frame::parse(&mut buf)?;
// Discard the frame from the buffer
self.buffer.advance(len);
// Return the frame to the caller.
Ok(Some(frame))
}
// Not enough data has been buffered
Err(Incomplete) => Ok(None),
// An error was encountered
Err(e) => Err(e.into()),
}
}
Copy
完整的 Frame::check
函数可以在这里找到。我们将不涉及它的全部内容。
需要注意的是,Buf的 “字节迭代器” 风格的API被使用。这些获取数据并推进内部游标。例如,为了解析一个帧,第一个字节被检查以确定该帧的类型。使用的函数是 Buf::get_u8
。这个函数获取当前光标位置的字节并使光标前进一个。
在Buf特性上还有更多有用的方法。查看API文档以了解更多细节。
分帧API的另一半是 write_frame(frame)
函数。这个函数将整个帧写到套接字中。为了尽量减少 write
系统调用,写将被缓冲。一个写缓冲区被维护,帧在被写入套接字之前被编码到这个缓冲区。然而,与 read_frame()
不同的是,在写入套接字之前,整个帧并不总是被缓冲到一个字节数组中。
考虑一个批量流帧。被写入的值是 Frame::Bulk(Bytes)
。散装帧的线格式是一个帧头,它由$字符和以字节为单位的数据长度组成。帧的大部分是Bytes值的内容。如果数据很大,把它复制到一个中间缓冲区将是很昂贵的。
为了实现缓冲写入,我们将使用 BufWriter
结构。这个结构被初始化为一个 T: AsyncWrite
并实现 AsyncWrite
本身。当在 BufWriter
上调用写时,写不会直接进入内部写器,而是进入一个缓冲区。当缓冲区满的时候,内容会被刷到内部写入器上,内部缓冲区被清空。还有一些优化措施,允许在某些情况下绕过缓冲区。
作为教程的一部分,我们将不尝试完整地实现 write_frame()
。请看这里的完整实现。
首先,Connection结构被更新。
use tokio::io::BufWriter;
use tokio::net::TcpStream;
use bytes::BytesMut;
pub struct Connection {
stream: BufWriter<TcpStream>,
buffer: BytesMut,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream: BufWriter::new(stream),
buffer: BytesMut::with_capacity(4096),
}
}
}
Copy
接下来,write_frame()被实现:
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;
async fn write_frame(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_u8(b'-').await?;
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.stream.write_u8(b'$').await?;
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Array(_val) => unimplemented!(),
}
self.stream.flush().await;
Ok(())
}
Copy
这里使用的函数是由 AsyncWriteExt
提供的。 它们在 TcpStream
上也是可用的,但是在没有中间缓冲区的情况下发出单字节的写入是不可取的。
write_u8
向写程序写一个字节。write_all
将整个片断写入写入器。write_decimal
是由 mini-redis 实现的。该函数以调用 self.stream.flush().await
结束。因为 BufWriter
将写入的数据存储在一个中间缓冲区中,对写入的调用并不能保证数据被写入套接字。在返回之前,我们希望帧被写到套接字中。对 flush()
的调用将缓冲区中的任何数据写到套接字。
另一个选择是不在 write_frame()
中调用 flush()
。相反,在 Connection
上提供一个 flush()
函数。这将允许调用者在写缓冲区中写入多个小帧的队列,然后用一个写系统调用将它们全部写入套接字。这样做会使 Connection API
复杂化。简化是 Mini-Redis
的目标之一,所以我们决定将 flush().await
调用包含在 fn write_frame()
中。