Future Trait

优质
小牛编辑
131浏览
2023-12-01

Future trait 是 Rust 异步编程的中心。一个Future就是可以产生值的异步计算(尽管该值可能为空,例如())。一种简化版本的 Future trait 可能如下所示:

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

Future 可以通过调用poll函数,推动 Future,让其尽可能接近完成。如果 Future 完成,它将返回Poll::Ready(result)。如果 Future 还不能完成,它将返回Poll::Pending,并安排wake()Future准备进一步的时候,调用。当wake()调用,executor 驱使 Future,再次调用poll,这样Future就离完成再进一步了。

若是没有wake(),executor 将无法知道某个特定的 Future 何时可以前进,并且将不得不,不断地 poll 每个 Future 。而有了wake(),executor 就能确切知道哪些 Future,已准备polled。

例如,考虑以下情况:我们想从一个 socket(套接字)中,读取数据,而该 socket 的数据不知道有没有。如果数据,我们可以读并返回Poll::Ready(data),但如果没有任何数据 ready,那么我们的 Future 将阻塞,无法再前进。处在没有可用数据时期,当 socket 上的数据 ready 时,我们必须挂上要调用的wake,这将告诉 executor,我们的 Future 已准备好前进。

一个简单的SocketRead Future 可能看起来像这样:

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data-- read it into a buffer and return it.
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data.
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

这个Futures 模型可以将多个异步操作组合在一起,而无需中间分配。一次运行多个 Future 或将 Future 链接在一起,可以通过无分配状态机来实现,如下所示:

/// A SimpleFuture that runs two other futures to completion concurrently.
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace.
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait.
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`.
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`.
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed-- we can return successfully
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have
            // work to do. They will call `wake()` when progress can be made.
            Poll::Pending
        }
    }
}

这显示了,如何在无需单独分配的情况下,同时运行多个 Future ,从而可以实现更高效的异步程序。同样,可以依次运行多个有序 Future ,如下所示:

/// A SimpleFuture that runs two futures to completion, one after another.
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`.
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future-- remove it and start on
                // the second!
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future.
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second.
        self.second.poll(wake)
    }
}

这些示例说明了Futuretrait 可用于表示异步控制流,而无需多个分配的对象,和深层嵌套的回调。随着基本控制流程的发展,让我们谈谈真正的Future trait 及其不同之处。

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

您会注意到的第一个变化是self类型,不再&mut self,而更改为Pin<&mut Self>。我们将详细讨论 pinning 在稍后章节,但现在知道它使我们能够创建 Immovable(无法移动) 的 Future 。无法移动的对象可以在其字段之间存储指针,例如struct MyFut { a: i32, ptr_to_a: *const i32 }。Pinning 是启用 async/await 所必需的。

其次,wake: fn()已更改为&mut Context<'_>。在SimpleFuture,我们使用了对函数指针(fn())的一个 call,去告诉 Future 的 executor,应该对有问题的 Future 进行 poll。但是,由于fn()大小为零(zero-sized),无法存储有关哪一个 Future调用了wake

在现实世界中,像 Web 服务器这样的复杂应用程序,可能具有成千上万个不同的连接,其唤醒都应单独进行管理。Context type 通过提供对一个Waker类型值的访问,来解决此问题,可用于唤醒特定任务。