select!

优质
小牛编辑
117浏览
2023-12-01

futures::select宏能同时运行多个 Future ,从而使用户,可以在任何 Future 完成后,立即做出响应。

use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}

上面的函数将同时运行t1t2同时。当t1或是t2完成后,相应的处理程序将调用println!,而该函数将在不完成剩余任务的情况下,结束。

select的基本语法是<pattern> = <expression> => <code>,,重复您想要在select上使用的,任意数量的 Future 。

default => ... and complete => ...

select也支持defaultcomplete分支。

如果select了的 Futures 没有一个是完成的,default分支将运行。select带上一个default分支的组合,始终会立即返回,因为default在其他 Future 均未准备好,就运行了。

complete分支可以用来处理,selected Future 全部完成,并且将不再前进。对一个select!循环访问时通常很方便。

use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}

Interaction with Unpin and FusedFuture

您可能在上面的第一个示例中,注意到的一件事是,我们必须在两个async fn返回的 Future 上调用.fuse(),以及将用pin_mut固定。这两个调用都是必需的,因为在select中使用的 futures ,必须同时实现了Unpin trait 与FusedFuture trait。

Unpin是必要的,因为select不是取值的,而是可变的引用。由于不拥有 Future 的所有权,因此可以在对select的调用后,未完成的 futures 还可以再次使用。

同样,FusedFuture trait 是必需的,因为select在一个 future 完成后,必不得对它再轮询。FusedFuture是 一个 Future trait,作用是追踪 Future 本身是否完成的。这样使得,select能在一个循环中使用,只轮询仍未完成的 Futures。可以在上面的示例中看到,其中a_fut或是b_fut是在循环第二次时完成。因为 future::ready返回的 Future 实现了FusedFuture,它可以告诉select不要再次轮询。

请注意,streams 具有对应的FusedStream trait。实现此 trait 或由.fuse()封装的 Streams,会从他们的 Future .next()/.try_next()组合器中, yield 出FusedFuture futures。

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}

Concurrent tasks in a select loop with Fuse and FuturesUnordered

一个有点难以发现但方便的函数是Fuse::terminated(),它允许构造一个已经终止的,空的 Future,之后可以用需要运行的 Future 填充它。

有个方便的情况就是,有一个任务需要在一个select循环内运行,但这个循环又是在这个select循环本身里面创建的。

注意使用.select_next_some()函数。可以与select合作,只运行那些由 stream 返回的Some(_)值,而忽略Nones。

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`,
                // dropping the old one.
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

如果需要同时运行多个相同 Future 的副本,请使用FuturesUnordered类型。以下示例与上面的示例相似,但是将运行run_on_new_num_fut的每个副本,直到完成,而不是在创建新的时,终止它们。还会打印出一个由run_on_new_num_fut返回的值。

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// Runs `run_on_new_num` with the latest number
// retrieved from `get_new_num`.
//
// `get_new_num` is re-run every time a timer elapses,
// immediately cancelling the currently running
// `run_on_new_num` and replacing it with the newly
// returned value.
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived-- start a new `run_on_new_num_fut`.
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}