当前位置: 首页 > 知识库问答 >
问题:

等待编译时未知的未来数

储承
2023-03-14

我想利用Tokio的运行时来处理可变数量的异步期货。由于在编译时期货的计数是未知的,因此 FuturesUnorderd 似乎是我的最佳选择(像 select 这样的宏需要在编译时指定你的分支;join_all可能是可能的,但是当顺序无关紧要时,文档建议“在很多情况下”FuturesUnordered)。

这个片段的逻辑是一个recv()循环,它被推送到期货桶中,应该一直运行。当新数据到达时,它的解析/处理也被推到futures桶中(而不是立即被处理)。这确保了接收器在响应新事件时保持低延迟,并且数据处理(潜在地计算昂贵的解密)与所有其他数据处理异步块(加上监听接收器)同时发生。

顺便说一下,这个线程解释了为什么期货会得到. box()

问题是这个神秘的错误:

error[E0277]: `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
  --> src/main.rs:27:8
   |
27 |     }).boxed());
   |        ^^^^^ `dyn futures::Future<Output = ()> + std::marker::Send` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `dyn futures::Future<Output = ()> + std::marker::Send`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Box<dyn futures::Future<Output = ()> + std::marker::Send>`
   = note: required because it appears within the type `Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>`
   = note: required because of the requirements on the impl of `Sync` for `FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because of the requirements on the impl of `std::marker::Send` for `&FuturesUnordered<Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>>>`
   = note: required because it appears within the type `[static generator@src/main.rs:16:25: 27:6 _]`
   = note: required because it appears within the type `from_generator::GenFuture<[static generator@src/main.rs:16:25: 27:6 _]>`
   = note: required because it appears within the type `impl futures::Future`

这看起来像是“递归地”推到一个无序的未来(我猜不是真的,但是你还能称它为什么?)不起作用,但我不确定为什么。此错误表示不满足盒的某些< code>Sync特征要求

use std::error::Error;
use tokio::sync::mpsc::{self, Receiver, Sender};
use futures::stream::futures_unordered::FuturesUnordered;
use futures::FutureExt;

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
    let mut futures = FuturesUnordered::new();
    let (tx, rx) = mpsc::channel(32);
    
    tokio::spawn( foo(tx) );    // Only the receiver is relevant; its transmitter is
                                // elsewhere, occasionally sending data.
    futures.push((async {                               // <--- NOTE: futures.push()
        loop {
            match rx.recv().await {
                Some(data) => {
                    futures.push((async move {          // <--- NOTE: nested futures.push()
                        let _ = data; // TODO: replace with code that processes 'data'
                    }).boxed());
                },
                None => {}
            }
        }
    }).boxed());
    
    while let Some(_) = futures.next().await {}

    Ok(())
}

共有2个答案

闻安宜
2023-03-14

当您使用 boxed 方法将异步块创建的未来装箱时,您正在尝试将其强制发送到 dyn Future Send

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

但是,创建的未来不是“发送”。为什么?因为在它里面,你试图推动期货无序,借用它:

pub fn push(&self, future: Fut)

这意味着async块捕获一个

对于发送的引用,类型还必须是Sync

impl<'_, T> Send for &'_ T where
    T: Sync

为了使< code > futures ordered 成为< code>Sync,存储的期货也必须是< code>Sync:

impl<Fut: Sync> Sync for FuturesUnordered<Fut> {}

但是,box返回的未来不一定是Sync

pub fn boxed<'a>(
    self
) -> Pin<Box<dyn Future<Output = Self::Output> + 'a + Send>>

这意味着异步生成器不是< code>Send,所以您不能将它强制为< code>dyn Future Send,并且您会得到一个令人困惑的错误消息。

解决方案是添加一个绑定到未来的同步,并手动添加框::pin

type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;

let mut futures = FuturesUnordered::<BoxedFuture>::new();

futures.push(Box::pin(async {
    loop {
        match rx.recv().await {
            Some(data) => {
                futures.push(Box::pin(async move {
                    let _ = data;
                }));
            }
            None => {}
        }
    }
}));

但是,您将遇到一系列借款问题。更好的解决方案是使用tokio::选择!而不是外部推送,正如Michael的回答所解释的那样。

徐德海
2023-03-14

我将把低级错误留给另一个答案,但是我认为解决这个高级问题的更惯用的方法是将< code > futures ordered 与类似于< code>tokio::select!如下:

use tokio::sync::mpsc;
use futures::stream::FuturesUnordered;
use futures::StreamExt;

#[tokio::main]
pub async fn main() {
    let mut futures = FuturesUnordered::new();
    let (tx, mut rx) = mpsc::channel(32);
    
    //turn foo into something more concrete
    tokio::spawn(async move {
        let _ = tx.send(42i32).await;
    });

    loop {
        tokio::select! {
            Some(data) = rx.recv() => {
                futures.push(async move {
                    data.to_string()
                });
            },
            Some(result) = futures.next() => {
                println!("{}", result)
            },
            else => break,
        }
    }
}

您可以在此处阅读有关选择宏的更多信息:https://tokio.rs/tokio/tutorial/select

 类似资料:
  • 我有一个方法,可以返回期货的 现在我想等待,直到所有的future都成功完成处理,或者future返回其输出的任何任务抛出异常。即使一项任务引发异常,等待另一项任务也没有意义。 简单的方法是 但这里的问题是,例如,如果第四个期货抛出异常,那么我将不必要地等待前三个期货可用。 如何解决这个问题?会以任何方式倒数闩锁帮助吗?我无法使用Future,因为java文档说

  • 我想运行相同类型的任务(工作线程),但一次不超过一定数量的任务。当任务完成时,其结果是新任务的输入,然后可以启动该任务。 有没有好的方法可以在C 11中使用异步/未来范式来实现这一点? 乍一看,它看起来很简单,你只是生成多个任务: 然后,运行以获取任务的异步结果。 然而,这里的问题是,未来的对象必须存储在某种队列中并一个接一个地等待。但是,可以一遍又一遍地迭代未来的对象,检查它们中的任何一个是否准

  • 问题内容: 我有一种返回List期货的方法 现在,我要等待,直到所有期货都成功完成处理,或者所有由期货返回输出的任务都引发异常。即使一项任务引发异常,也没有必要等待其他期货。 简单的方法是 但是这里的问题是,例如,如果第4个期货抛出异常,那么我将不必要地等待前3个期货可用。 如何解决呢?会以任何方式倒计时闩锁帮助吗?我无法使用Future,isDone因为Java文档说 问题答案: 你可以使用Co

  • 我希望像下面这样的代码可以等待这两种未来,但是没有。 我以为< code>seq.onComplete会在完成自身之前等待它们全部完成,但事实并非如此;它会导致: 在scala.concurrent.Future的源代码中有点难以遵循,我想知道如何实现等待(动态大小的)序列的所有原始未来的并行,或者这里可能有什么问题。 编辑:相关问题:https://worldbuilding.stackexch

  • 我在寻找一种方法来等待一个窗口弹出,你不知道的名字。我在找这样的东西:

  • 我正在尝试执行一些相互引用作为外键的查询,因此我必须等到外部未来完成。此算法放置在返回未来的函数中。 运行它时,我得到了这个: === TO CLIENT === 已完成存档 已完成全部 === TO CLIENT === 失败的 单词已完成 的单词 已完成 标记在博客中 完成投票 所以我的问题是,我怎么能等到未来的< code>mysql完成它的“childs”(在< code>whenComp