Tokio教程之stream
流是一个数值的异步系列。它是 Rust 的 std::iter::Iterator
的异步等价物,由 Stream 特性表示。流可以在 async 函数中被迭代。它们也可以使用适配器进行转换。Tokio在 StreamExt
trait上提供了许多常见的适配器。
Tokio在一个单独的 tokio-stream
crate 中提供流支持:
tokio-stream = "0.1"
目前,Tokio 的 Stream工具存在于
tokio-stream
crate中。一旦 Stream 特性在 Rust 标准库中稳定下来,Tokio 的 Stream 工具将被移到 tokio crate 中。
目前,Rust编程语言不支持异步 for 循环。相反,流的迭代是通过与 StreamExt::next()
搭配的 while let
循环完成的。
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let mut stream = tokio_stream::iter(&[1, 2, 3]);
while let Some(v) = stream.next().await {
println!("GOT = {:?}", v);
}
}
Copy
像迭代器一样,next()
方法返回 Option<T>
,其中T是流的值类型。接收到 None 表示流的迭代已经结束。
让我们来看看使用 Mini-Redis 客户端的一个稍微复杂的例子。
完整的代码可以在这里找到。
use tokio_stream::StreamExt;
use mini_redis::client;
async fn publish() -> mini_redis::Result<()> {
let mut client = client::connect("127.0.0.1:6379").await?;
// Publish some data
client.publish("numbers", "1".into()).await?;
client.publish("numbers", "two".into()).await?;
client.publish("numbers", "3".into()).await?;
client.publish("numbers", "four".into()).await?;
client.publish("numbers", "five".into()).await?;
client.publish("numbers", "6".into()).await?;
Ok(())
}
async fn subscribe() -> mini_redis::Result<()> {
let client = client::connect("127.0.0.1:6379").await?;
let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
let messages = subscriber.into_stream();
tokio::pin!(messages);
while let Some(msg) = messages.next().await {
println!("got = {:?}", msg);
}
Ok(())
}
#[tokio::main]
async fn main() -> mini_redis::Result<()> {
tokio::spawn(async {
publish().await
});
subscribe().await?;
println!("DONE");
Ok(())
}
Copy
任务被派生出来,在 “number” 频道上向 Mini-Redis 服务器发布消息。然后,在主任务中,我们订阅 “number” 频道并显示收到的消息。
在订阅之后,into_stream()
被调用到返回的订阅者上。这将消耗 Subscriber ,返回一个 stream,在消息到达时产生消息。在我们开始迭代消息之前,请注意流是用 tokio::pin
pin 在栈上的。在一个流上调用 next()
需要流被 pin 住。into_stream()
函数返回的是一个没有 pin 的流,我们必须明确地 pin 它,以便对其进行遍历。
当一个 Rust 值在内存中不能再被移动时,它就被 “pin"了。被 pin 的值的一个关键属性是,指针可以被带到被 pin 的数据上,并且调用者可以确信该指针保持有效。这个特性被
async/await
用来支持跨.await
点借用数据。
如果我们忘记 pin 住流,就会出现这样的错误:
error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
--> streams/src/main.rs:29:36
|
29 | while let Some(msg) = messages.next().await {
| ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
|
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
= note: required because it appears within the type `impl Stream`
= note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
Copy
如果你遇到这样的错误信息,请尝试 pin 住该值!
在尝试运行这个之前,启动Mini-Redis服务器:
$ mini-redis-server
然后尝试运行该代码。我们将看到输出到STDOUT的信息:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })
Copy
一些早期的信息可能会被丢弃,因为订阅和发布之间存在着竞争。该程序永远不会退出。只要服务器处于活动状态,对 Mini-Redis 频道的订阅就会保持活动状态。
让我们看看我们如何用流来扩展这个程序。
接受一个 stream 并返回另一个 stream 的函数通常被称为 “stream adapter”,因为它们是 “适配器模式"的一种形式。常见的流适配器包括 map
、take
和 filter
。
让我们更新Mini-Redis,使其退出。在收到三条消息后,停止迭代消息。这是用 take
完成的。这个适配器将流限制为最多产生n条消息。
let messages = subscriber
.into_stream()
.take(3);
Copy
再次运行该程序,我们得到:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
Copy
这一次,程序结束了。
现在,让我们把信息流限制在单个字节。我们将通过检查消息的长度来检查这一点。我们使用 filter
适配器来放弃任何不符合前提条件的消息。
let messages = subscriber
.into_stream()
.filter(|msg| match msg {
Ok(msg) if msg.content.len() == 1 => true,
_ => false,
})
.map(|msg| msg.unwrap().content)
.take(3);
Copy
现在,输出是:
got = b"1"
got = b"3"
got = b"6"
Copy
另一个选择是使用 filter_map
将 filter 和 map 步骤合并为一个单一的调用。
还有更多可用的适配器。请看这里的列表。
stream 特征与 future 特征非常相似:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
Copy
Stream::poll_next()
函数很像 Future::poll
,只是它可以被反复调用,以便从流中接收许多值。就像我们在 Async in depth
中看到的那样,当一个流还没有准备好返回一个值时,就会返回 Poll::Pending
来代替。该任务的waker被注册。一旦流应该被再次poll,该唤醒者将被通知。
size_hint()
方法与迭代器的使用方法相同。
通常,当手动实现一个 stream 时,是通过组合 future 和其他流来完成的。作为一个例子,让我们以我们在 Async 中深入实现的 Delay future 为基础。我们将把它转换为一个stream,以10毫秒的间隔产生三次 ()
。
use tokio_stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
struct Interval {
rem: usize,
delay: Delay,
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<()>>
{
if self.rem == 0 {
// No more delays
return Poll::Ready(None);
}
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let when = self.delay.when + Duration::from_millis(10);
self.delay = Delay { when };
self.rem -= 1;
Poll::Ready(Some(()))
}
Poll::Pending => Poll::Pending,
}
}
}
Copy
async-stream
使用 Stream trait 手动实现流是很繁琐的。不幸的是,Rust编程语言还不支持用于定义流的 async/await 语法。这一点正在酝酿之中,但还没有准备好。
Async-stream
crate可以作为一个临时的解决方案。这个 crate 提供了一个 async_stream!
宏,将输入转化为一个流。使用这个create,上面的 interval 可以这样实现:
use async_stream::stream;
use std::time::{Duration, Instant};
stream! {
let mut when = Instant::now();
for _ in 0..3 {
let delay = Delay { when };
delay.await;
yield ();
when += Duration::from_millis(10);
}
}