我们在上文《Rust网络编程框架-Tokio进阶》介绍了async/await和锁的基本用法,并完成了一个Server端的DEMO代码。本文继续来探讨这个话题。
上文中依靠telnet来触发服务端代码的执行,本文我们将自己实现一个客户端。由于笔者也没有从之前比如GO、JAVA等语言的套路中完全走出来,我最初的实现是这样的
#[tokio::main]async fn main() {
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
// 生成一个读取key的任务
let t1 = tokio::spawn(async {
let res = client.get("hello").await;
});
// 生成一个设置key的任务
let t2 = tokio::spawn(async {
client.set("foo", "bar".into()).await;
});
t1.await.unwrap();
t2.await.unwrap();
}
但是以上代码根本就无法编译,因为tokio任务T1和T2都需要使用client,但是client并没有像上文中Arc::<Mutex::<HashMap>>一样实现copy方法,你还不能clone一个client分别给t1和t2使用,当然我们可以使用Mutex来解决任务之间的矛盾问题,但正如我们上文所说互斥锁的最大问题就是在同一时刻只能有一个任务执行到被加锁的关键代码,这样做法的效率又是问题。
使用channel管道进行消息传递,其实就是我们在并发编程框架中常用的生产者消费者模式。这个设计模式在本例当中其实就是生成两个任务,一个专门用来产生消息,另一个专门用来向服务端发送消息,channel管道其实就是一个消息的缓冲区,在发送任务繁忙时,产生的消息其实都在消息队列中缓冲,一旦有发送任务缓过劲来,就可以从管道里取新消息进行发送,与Mutex的互斥锁方案相比,channel管理的方式明显可以做得更大的性能与吞吐量。
在Tokio中提供以下四种管道的工作模式
Mpsc:Multi-Producer,Single-Consumer,也就是多生产者,单一消费者模式。
Oneshot:单一模式,也就是单一生产者,单一消费模式。
广播(broadcast)模式:Multi-producer multi-consumer。多生产者,多消费者的多对多模式。
观察(watch)模式:Single-Producer,multi-consumer。单生产者,多消费者的模式,这个模式与其它模式略有不同,每个接收者都只能看到最近的值。
这里笔者要特别提示大家,注意Tokio当中的channel管道与Rust原生channel和crossbeam提供的Channel不是同一个概念,Tokio中对于消费者来说,调用recv API返回的还是一个Future对象,recv接收消息操作并不会阻塞进程,这也是Tokio设计的一贯风格。以MPSC为例,使用样例如下:
use tokio::sync::mpsc;
#[tokio::main]async fn main() {
let (tx, mut rx) = mpsc::channel(32);
let tx2 = tx.clone();//clone之后可以将channel指派给不同任务
tokio::spawn(async move {
tx.send("sending from first handle").await;//必须调用await才会阻塞
});
tokio::spawn(async move {
tx2.send("sending from second handle").await;
});
while let Some(message) = rx.recv().await {
println!("GOT = {}", message);
}
}
使用管道方式完整的客户端代码及注释如下:
use tokio::sync::mpsc;
use mini_redis::client;
use mini_redis::Command::*;
use bytes::Bytes;
//先定义redis的命令类型
#[derive(Debug)]
enum Command {
Get {
key: String,
},
Set {
key: String,
val: Bytes,
}
}
#[tokio::main]
async fn main() {
//首先建立MPSC模式的通道
let (tx, mut rx) = mpsc::channel(32);
//消费者允许多个,可以克隆
let tx2 = tx.clone();
//t1任务执行get操作
let t1 = tokio::spawn(async move {
let cmd = Command::Get {
key: "hello".to_string(),
};
tx.send(cmd).await.unwrap();
});
//t2任务执行set操作
let t2 = tokio::spawn(async move {
let cmd = Command::Set {
key: "foo".to_string(),
val: "bar".into(),
};
tx2.send(cmd).await.unwrap();
});
//manager任务是消费者,接收消息,并向服务端发送信息。
let manager = tokio::spawn(async move {
let mut client = client::connect("127.0.0.1:6379").await.unwrap();
while let Some(cmd) = rx.recv().await {
use Command::*;
match cmd {
Get { key } => {
client.get(&key).await;
println!("get command send");
}
Set { key, val } => {
client.set(&key, val).await;
println!("set command send");
}
}
}
});
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}
客户端执行结果如下:
get command send
set command send
注意:客户端需要在服务端启动的情况下才能运行,完整的服务端代码如下:
use tokio::net::TcpListener;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::net::TcpStream;
use mini_redis::{Connection, Frame};
use bytes::Bytes;
type Db =Arc<std::sync::Mutex<HashMap<String, Bytes>>>;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
println!("Listening");
let db = Arc::new(Mutex::new(HashMap::new()));
loop {
let (socket, _) = listener.accept().await.unwrap();
// Clone the handle to the hash map.
let db = db.clone();
println!("Accepted");
tokio::spawn(async move {
process(socket, db).await;
});
}
}
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
while let Some(frame) = connection.read_frame().await.unwrap() {
let response = match Command::from_frame(frame).unwrap() {
Set(cmd) => {
let mut db = db.lock().unwrap();
println!("set command got");
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
Get(cmd) => {
let db = db.lock().unwrap();
println!("get command got");
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
// Write the response to the client
connection.write_frame(&response).await.unwrap();
}
}
Tokio中对于I/O的读写操作方式与标准Rust的API基本相同,只是Tokio的读写都是异步的,在使用Tokio的读(AsyncRead)和写(AsyncWrite)等API,必须与.await一起使用,才能阻塞。比如下列代码是肯定不能编译通过的。
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("beyondma.txt");
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]);
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
上述代码需要进行修改才能运行,如下:
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("beyondma.txt").await?;
let mut buffer = [0; 10];
// read up to 10 bytes
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}
另外注意:当read()返回Ok(0)时,表示Stream已经关闭,对于TcpStream实例,Ok(0)代表socket已关闭,如果代码运行在一个循环当中,此时应该退出循环。
在上一节的示例代码中,对于socket的读写都是由一个任务完成的,为了通过读写分离,来达到更高效率的,我们必须将TcpStream拆分为读和写两个handle。对于tokio的框架来看,读写分享使用io::split来实现。例程如下:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
#[tokio::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
match socket.read(&mut buf).await {
//记住ok(0)需要直接返回
Ok(0) => return,
Ok(n) => {
// Copy the data back to socket
if socket.write_all(&buf[..n]).await.is_err() {
return;
}
}
Err(_) => {
return;
}
}
}
});
}
}
这是一个典型的回显输入的Echo Server,另外启动一个终端执行telnett 结果如下:
telnet 127.0.0.1 6380
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
skldjsfl
skldjsfl
sksdkj
sksdkj