已应用:生成一个执行器
Rust 的Future
是懒惰的:除非是向着'完成'这一个目标积极前进,否则他们不会做任何事情。向 Future 完成前进的一种方法是,在async
函数里面,对它.await
,但这只会将问题升了个级:谁来管理,从顶层 async
函数返回的 Futures ?答案是:我们需要一个Future
执行者(executor)。
Future
executor 获取一组顶层Future
,并每当Future
可以前进时,通过调用poll
,让它们驶向完成。通常一旦开始,executor 会poll
一个 Future 。当Future
表示,因wake()
的调用准备好前进,会将它们先放回到一个队列,才再次poll
,重复直到Future
已经完成。
在本节中,我们将编写自己的简单 executor,该 executor 能够让大量顶层 Future 同时驶向完成。
在此示例中,我们依赖futures
箱子,ArcWake
trait 会用到,它提供了一种轻松的方法来构建Waker
。
[package]
name = "xyz"
version = "0.1.0"
authors = ["XYZ Author"]
edition = "2018"
[dependencies]
futures-preview = "=0.3.0-alpha.17"
接下来,我们需要在顶部,添加以下内容src/main.rs
:
use {
futures::{
future::{FutureExt, BoxFuture},
task::{ArcWake, waker_ref},
},
std::{
future::Future,
sync::{Arc, Mutex},
sync::mpsc::{sync_channel, SyncSender, Receiver},
task::{Context, Poll},
time::Duration,
},
// The timer we wrote in the previous section:
timer_future::TimerFuture,
};
我们的 executor 的工作是,将通过发送任务,在通道上运行。executor 将事件从通道中拉出,并运行它们。当一个任务准备做更多的工作(被唤醒)时,它可以安排自己重新回到通道上,以计划再次进行轮询。
在这种设计中,executor 本身仅需要任务通道的接收端。用户将获得发送端,以便他们可以生成新的 Future 。任务本身就是 Future,是可以重新计划自己的。因此,我们会将它们与一个 sender 每每存储在一起,这样,任务就可以用来让自己重新排队。
/// Task executor that receives tasks off of a channel and runs them.
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
struct Task {
/// In-progress future that should be pushed to completion.
///
/// The `Mutex` is not necessary for correctness, since we only have
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread,
/// so we need use the `Mutex` to prove thread-safety. A production
/// executor would not need this, and could use `UnsafeCell` instead.
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once.
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor.
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
(Executor { ready_queue }, Spawner { task_sender })
}
我们还向 spawner 添加一种方法,以使其易于生成新的 Future 。此方法将拿到一个 Future 类型,将其装箱,并放入 FutureObj 中,然后创建一个新类型Arc<Task>
,它的内部可以在 executor 上,排队。
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
要轮询 Future ,我们需要创建一个Waker
。正如在唤醒章节,Waker
负责安排,一旦wake
调用了,就再次轮询的任务。记住,Waker
s 是会告诉 executor,确切的那些任务已经准备就绪,只轮询准备好前进的 Future。创建一个新的Waker
最简单的方法是,通过实现ArcWake
trait ,然后使用waker_ref
要么.into_waker()
函数,将Arc<impl ArcWake>
转变成一个Waker
。让我们,为我们的任务实现ArcWake
,这样就可以转变为Waker
,和被唤醒啦:
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("too many tasks queued");
}
}
当一个新建的Waker
,从Arc<Task>
而来,那么我们在它上面调用wake()
,将导致Arc
的一个 copy 发送到任务通道。然后,我们的 executor 需要选择任务,并进行轮询。让我们实现一下:
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it.
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// Create a `LocalWaker` from the task itself
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// `BoxFuture<T>` is a type alias for
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
if let Poll::Pending = future.as_mut().poll(context) {
// We're not done processing the future, so put it
// back in its task to be run again in the future.
*future_slot = Some(future);
}
}
}
}
}
恭喜你!我们现在有一个能工作的 Future executor。我们甚至可以使用它,来运行async/.await
代码和自定义 Future ,例如,我们之前写过的TimerFuture
:
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer.
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run.
drop(spawner);
// Run the executor until the task queue is empty.
// This will print "howdy!", pause, and then print "done!".
executor.run();
}