各位老师,我是一个rust新手尝试使用tokio做一些实验。目前碰到一个问题,我模拟了一个客户端client.rs,想要他和服务端server.rs进行通讯。客户端发送一些信息给到服务端,进行处理后再把结果发回来。
目前的代码会出现客户端卡死,用try_read看了一下原因是WouldBlock错误,单发送和单接收都正常。一旦像代码中出现发送后再接收就会出问题。能否有大佬解释一下原因,谢谢。
client.rs
use clap::{Parser, ValueEnum};use serde::{Deserialize, Serialize};use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpSocket, TcpStream},};type BoxedError = Box<dyn std::error::Error>;#[derive(Serialize, Deserialize)]struct RequestFrame { command: String, key: String, value: Option<String>,}impl RequestFrame { fn new(command: String, key: String, value: Option<String>) -> RequestFrame { RequestFrame { command: command, key: key, value: value, } }}#[derive(Parser, Debug)]#[command( version = "0.0.1", author = "haozhang", long_about = "this is a self-made redis client")]struct Args { #[arg(short, long, value_name = "command")] command: Command, #[arg(short, long, value_name = "key")] key: String, #[arg(short, long, value_name = "value")] value: Option<String>,}#[derive(Debug, Clone, ValueEnum)]enum Command { Get, Set,}async fn get_stream() -> Result<TcpStream, BoxedError> { let addr = "127.0.0.0:6379".parse().unwrap(); let socket = TcpSocket::new_v4().unwrap(); let stream = socket.connect(addr).await.unwrap(); Ok(stream)}#[tokio::main]async fn main() -> Result<(), BoxedError> { // let args = std::env::args().collect::<Vec<String>>(); let mut stream = get_stream().await?; let (mut reader, mut writer) = stream.split(); let args = Args::parse(); let command = match args.command { Command::Get => "get".to_string(), Command::Set => "set".to_string(), }; let key = args.key; let value = args.value; // println!("{:?}", command); // println!("{:?}", key); // println!("{:?}", value); let request = RequestFrame::new(command, key, value); let request_json_string = serde_json::to_string(&request)?; writer.write_all(request_json_string.as_bytes()).await?; writer.flush().await?; let mut buf = String::new(); reader.read_to_string(&mut buf).await?; // let mut buf = [0; 100]; // reader.try_read(&mut buf)?; println!("{:?}", buf); println!("done"); Ok(())}
server.rs
use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpListener,};type BoxedError = Box<dyn std::error::Error>;#[tokio::main]async fn main() -> Result<(), BoxedError> { let listener = TcpListener::bind("0.0.0.0:6379").await?; println!("listening on {}", listener.local_addr()?.port()); loop { let (socket, _) = listener.accept().await?; tokio::spawn(async { match process(socket).await { Ok(()) => (), Err(e) => eprintln!("process occurs an error: {e}") }; }); }}async fn process(mut socket: tokio::net::TcpStream) -> Result<(), BoxedError> { let (mut reader, mut writer) = socket.split(); let mut buf = String::new(); reader.read_to_string(&mut buf).await?; writer.write_all("we got the command, thanks".as_bytes()).await?; writer.flush().await?; println!("content: {}", buf); Ok(())}
用错方法了,server中的read_to_string是通过EOF作为结束标志的,这个一般是操作文件的。
这里需要通过read方法手动拼接结果
WouldBlock
错误在 Tokio 中通常意味着当前的异步操作还不能立即完成,需要等待一段时间或者其它异步操作触发之后才能完成。在你提供的代码中,这个问题最有可能出现在客户端的读取操作中。
在你的 client.rs
文件中,你在发送请求后立即尝试读取响应:
writer.write_all(request_json_string.as_bytes()).await?;writer.flush().await?;let mut buf = String::new();reader.read_to_string(&mut buf).await?; // 这里可能会返回 WouldBlock 错误
read_to_string
方法会一直尝试读取数据,直到遇到 EOF(文件结束符)或者发生错误。如果服务端还没有准备好发送数据,那么这个方法就会返回 WouldBlock
错误,因为当前没有数据可读。
解决这个问题的一个方法是使用 tokio::io::AsyncReadExt::read_until
或者其它可以处理 WouldBlock
错误的读取方法。但是,对于你的场景来说,你可能需要实现一个协议来确保客户端在读取之前知道服务端已经发送了数据。这通常涉及到发送和接收特定的消息头或协议控制消息,来通知对方数据已经准备好。
另一个方法是使用流(Stream)和异步迭代器(async iterator)来处理消息。Tokio 提供了一些工具,如 tokio::io::split
配合 tokio_util::codec
,可以帮助你更方便地处理这种情况。
这里有一个简化的例子,展示了如何使用 tokio_util::codec
中的 Framed
和 LinesCodec
来简化消息的读写过程:
use tokio_util::codec::{Framed, LinesCodec};use tokio::net::TcpStream;// 假设消息是以换行符分隔的文本行async fn send_and_receive(stream: TcpStream) -> Result<String, Box<dyn std::error::Error>> { let (mut reader, mut writer) = Framed::new(stream, LinesCodec::new()).split(); // 发送消息 writer.send("your_message_here").await?; // 读取响应 let response = reader.next().await.transpose()?; response.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "unexpected EOF"))?}// 在你的 main 函数中使用 send_and_receive
请注意,这只是一个简化的例子,实际的应用中你可能需要定义自己的编解码器(Codec)来处理复杂的协议。
最后,请确保你的服务端代码在发送响应后,不要立即关闭连接,这样客户端才能读取到响应。在你的 server.rs
文件中,process
函数在发送完响应后就结束了,这可能会导致连接被关闭。你可能需要修改服务端代码以确保连接在发送完响应后仍然保持打开状态,直到客户端完成读取。
希望这些建议能帮助你解决问题!如果还有其他问题,请继续提问。
如下图,会在258行一直卡住。数据库数量小不会有问题。感觉是把数据都放到内存里了? 大数据量的access改怎么读取呢
我将main函数创建为tokio::main。在main函数中,我创建了两个异步任务,一个侦听通道,另一个触发C-API中的消息队列。如果消息可用,我希望通过回调函数上的通道发送它们,这样我就可以接收任务上的消息,在那里我正在监听事件。稍后,我希望通过SSE或GraphQL订阅将这些消息发送给几个客户机。 我不能更改C回调,因为它们需要被C-API通过,我必须使用回调,否则我不会得到消息。 我最新
问题内容: 我想在SLE4442智能卡中读取和写入数据,我有ACR38U-i1智能卡读卡器 对于写入,我使用此命令 并用于读取数据 两者都执行并发送SW = 9000,但没有一个数据收到响应APDU就像我写6262数据一样,但没有收到 我也在写命令和读命令之前使用Select命令 选择命令是 有没有人在SLE4442智能卡中读写正确的Java代码? 问题答案: 查看您的ACR读取器规格,并使用特定
有一个Merge数据结构,它允许“合并”两个同质流,并忘记出处。 是否存在一个流的代数逐点标记并,它从的流和的流生成的流?
我有以下问题:我试图从函数调用闭包,但是以后必须调用另一个函数。 我无法创建异步闭包,因为它们目前不稳定: 所以我得以某种方式这样做。 我发现了几个与该问题相关的问题,例如,但是当我试图实现它时,我收到了以下错误: 这里是游乐场链接,希望能显示我有什么问题。 我正在使用标题中所述的tokio。
题目描述 sql='select * from A',A表有8百万数据 前段时间写了数据库互相导数据的Python脚本,是Oracle导入postgreSQL,使用cx_Oracle执行execute(sql)没有任何问题。这次是postgreSQL导入postgreSQL,使用psycopg2执行execute(sql)就直接卡死在这一行了,并且内存占用持续上升。 自己的思路 数据库连接是没有问