当前位置: 首页 > 工具软件 > async-std > 使用案例 >

3.教程:使用async-std编写聊天服务(3.3 接收消息)

张岳
2023-12-01

                            3.3  接收消息
 
让我们实现接收部分的协议。我们需要:
TcpStream并将字节按分割符\n拆分后解码为utf-8
将第一行解释为登录
将其余行解析为login:message

use async_std::{
    io::BufReader,
    net::TcpStream,
};

async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
    let listener = TcpListener::bind(addr).await?;
    let mut incoming = listener.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        println!("Accepting from: {}", stream.peer_addr()?);
        let _handle = task::spawn(connection_loop(stream)); // 1
    }
    Ok(())
}

async fn connection_loop(stream: TcpStream) -> Result<()> {
    let reader = BufReader::new(&stream); // 2
    let mut lines = reader.lines();

    let name = match lines.next().await { // 3
        None => Err("peer disconnected immediately")?,
        Some(line) => line?,
    };
    println!("name = {}", name);

    while let Some(line) = lines.next().await { // 4
        let line = line?;
        let (dest, msg) = match line.find(':') { // 5
            None => continue,
            Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
        };
        let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
        let msg: String = msg.to_string();
    }
    Ok(())
}
  1. 我们使用task::spawn函数生成一个独立的任务来处理每个客户机。也就是说,在接受客户端之后,accept_loop立即开始等待下一个循环。这是事件驱动架构的核心好处:我们可以同时为许多客户机提供服务,而不需要花费许多硬件线程。
  2. 幸运的是,“将字节流拆分成行”功能已经实现。.lines()调用返回字符串的流。
  3. 我们得到第一行--登录
  4. 再次,我们实现了一个手动异步for循环。
  5. 最后,我们将每一行解析为目标登录名和消息本身的元组。

3.3.1 管理错误

上述解决方案中的一个严重问题是,当我们在connection_loop中正确地传播错误时,我们只是在随后将错误丢弃了!
也就是说,task::spawn不会立即返回错误(它不能,它需要先运行future才能完成),
只有在它加入之后。我们可以通过waiting任务加入来“修复”,如下所示:


let handle = task::spawn(connection_loop(stream));
handle.await

     上面使用 handler.await等待直到客户完成,然后再配合使用?操作符来抛出错误结果。使用这个解决方案有两个问题!

     首先,因为我们一直等待客户端,所以一次只能处理一个客户机,这完全违背了使用异步的目的!
      其次,如果客户端遇到IO错误,则整个服务器立即退出。也就是说,一个不稳定的互联网连接的一个异常毁掉了整个聊天室!
      在这种情况下,处理客户端错误的正确方法是记录它们,然后继续为其他客户端服务。所以让我们使用一个helper函数:

fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
    F: Future<Output = Result<()>> + Send + 'static,
{
    task::spawn(async move {
        if let Err(e) = fut.await {
            eprintln!("{}", e)
        }
    })
}

 

 类似资料: