当前位置: 首页 > 知识库问答 >
问题:

将异步函数传递给另一个代码块(不能满足编译器)

连坚白
2023-03-14

我目前正在跨线程发送闭包/函数。

这对于同步功能来说非常好。

我专门通过pub类型WSMethod

正在发送的函数示例

pub fn update_league(req: WSReq, conn: PgConn, _: &mut WSConnections_, _: Uuid) -> Result<String, BoxError>{
    let deserialized = serde_json::from_value(req.data)?;
    let league = db::update_league(&conn, deserialized)?;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, league);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

但是现在我想切换到发送异步函数,

即。

pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data)?;
    let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec())?;
    if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
        sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
    }
    publish_competitions(ws_conns, &competitions_out).await;
    let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
    serde_json::to_string(&resp_msg).map_err(|e| e.into())
}

它是完全相同的函数签名,它只是异步的。

当我把函数装箱以便它们可以被发送时,我得到了这个错误

Box::new(upsert_competitions))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected enum `std::result::Result`, found opaque type

满:

288 | pub async fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
    |                                                                                                                ------------------------ the `Output` of this `async fn`'s found opaque type
    |
    = note:     expected enum `std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future`
    = note: required for the cast to the object type `dyn for<'r> std::ops::Fn(warp_ws_server::WSReq, diesel::r2d2::PooledConnection<diesel::r2d2::ConnectionManager<diesel::PgConnection>>, &'r mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>, uuid::Uuid) -> std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>> + std::marker::Send + std::marker::Sync`

我已尝试附加。等待<code>到<code>方法(req,conn,ws_conns,user_ws_id)。等待,传递的方法的调用站点。

这会导致编译器错误,因为Future没有为Result实现。因此

我从:<代码>框中更改类型

它抱怨期货的大小,所以我框未来,然后另一个错误(见unpin),所以我钉错误。

最终导致Box

错误现在是

Box::new(upsert_competitions)^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 预期结构'std::pin::Pin',发现不透明类型

expected struct `std::pin::Pin<std::boxed::Box<dyn core::future::future::Future<Output = std::result::Result<std::string::String, std::boxed::Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync>>`
            found opaque type `impl core::future::future::Future

我不明白如何从这里开始。我不认为我应该固定/装箱函数结果,我想在调用函数时固定/装箱返回的未来,但我认为我不能这样做,

当我调用func时,我当然想在func创建之后对未来进行装箱/钉扎,而不是更早。

我也尝试过这样的东西

框::新(引脚::新(框::新(upsert_competitions))))基于上述错误,

它给了我一个期望的< code>Fn

完整最新代码的来源:

封闭式def

作为常规函数成功传递了闭包

闭包作为异步func传递失败

闭包被调用

编辑:

最新更新(错误进展)

pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    async fn hmmm(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Result<String, BoxError>{
        let deserialized: Vec<NewCompetition> = serde_json::from_value(req.data).expect("fuck");
        println!("{:?}", &deserialized);
        let competitions_out= db::upsert_competitions(&conn, deserialized.into_iter().map(transform_from).collect_vec()).expect("fuck");
        // assume anything upserted the user wants to subscribe to
        if let Some(ws_user) = ws_conns.lock().await.get_mut(&user_ws_id){
            sub_to_competitions(ws_user, competitions_out.iter().map(|c| &c.competition_id)).await;
        }
        // TODO ideally would return response before awaiting publishing going out
        publish_competitions(ws_conns, &competitions_out).await;
        println!("{:?}", &competitions_out);
        let resp_msg = WSMsgOut::resp(req.message_id, req.method, competitions_out);
        let out = serde_json::to_string(&resp_msg).map_err(|e| e.into());
        out
    }
    Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
}
305 |     Box::pin(hmmm(req, conn, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by 
`hmmm` is not `Sync`

所以现在只需要想出如何让未来同步

note: future is not `Sync` as this value is used across an await

给了我很好的线索

299 |         publish_competitions(ws_conns, &competitions_out).await;
    |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ await

出现在这里,conn可能会在以后使用

发现我必须在inner-async函数之外使用< code>conn,而不能在await中使用。

在修改了await中的变量之后,我现在到达了

error[E0621]: explicit lifetime required in the type of `ws_conns`
   --> src/handlers.rs:305:5
    |
289 | pub fn upsert_competitions(req: WSReq, conn: PgConn, ws_conns: &mut WSConnections_, user_ws_id: Uuid) -> Pin<Box<dyn Future<Output=Result<String, BoxError>> + Send + Sync>>{
    |                                                                ------------------- help: add explicit lifetime `'static` to the type of `ws_conns`: `&'static mut std::sync::Arc<tokio::sync::mutex::Mutex<std::collections::HashMap<uuid::Uuid, warp_ws_server::WSConnection<subscriptions::Subscriptions>>>>`
...
305 |     Box::pin(hmmm(req, competitions_out, ws_conns, user_ws_id))
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ lifetime `'static` required

尝试过制作

我还尝试使用< code>upsert_competitions

但是,对于<code>std::sync::Arc,没有实现获取特性<code>lock_api::mutex::RawMutex</code>

我需要找到一个实现。lock(),但也是Arc实现的一个特征。


共有2个答案

魏朗
2023-03-14

user1937是一个简单的答案,可能有效(稍后将进行测试),

然而,一夜之间,我意识到将函数放入哈希图并移动对函数的引用的方法.....有点过分了。

这是对traits的使用(在一个地方我不知道实现,但是我可以定义一个接口,在另一个地方实现那个接口)

相反,我在lib中定义了一个async-trait(目前需要async-trait crate)

pub trait WSHandler<T: Subscriptions>{
    async fn ws_req_resp(
        msg: String, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
    ) -> Result<String, BoxError>;
}

并告诉funcs期待一个通用的WsHandler

async fn handle_ws_msg<T: Subscriptions, U: WSHandler<T>>(
    msg: ws::Message, conn: PgConn, ws_conns: &mut WSConnections<T>, user_ws_id: Uuid
) -> ws::Message{
    match msg.to_str(){
        // Can't get await inside `and_then`/`map` function chains to work properly
        Ok(msg_str) => match U::ws_req_resp(msg_str.to_string(), conn, ws_conns, user_ws_id).await{
            Ok(text) => ws::Message::text(text),
            Err(e) => ws_error_resp(e.to_string())
        },
        Err(_) => ws_error_resp(String::from("wtf. How does msg.to_str fail?"))
    }
}

然后,在我的主程序中,我能够植入这种特质

struct A{
}

#[async_trait]
impl WSHandler<subscriptions::Subscriptions> for A{

    async fn ws_req_resp(
        msg: String, conn: PgConn, ws_conns: &mut WSConnections<subscriptions::Subscriptions>, user_ws_id: Uuid
    ) -> Result<String, BoxError>{
        let req: WSReq = serde_json::from_str(&msg)?;
        println!("{}", &req.data);
        let stringybob = String::from("upsert_competitions");
        match req.method.clone(){
            a if a == stringybob => upsert_competitions2(req, conn, ws_conns, user_ws_id).await,
            // imagine the other methods here
            uwotm8 => Err(Box::new(InvalidRequestError{description: uwotm8.to_string()}))
        }
    }
}
ws.on_upgrade(move |socket| warp_ws_server::handle_ws_conn::<subscriptions::Subscriptions, A>(socket, pool, ws_conns))

14小时后,它终于运行了。万岁!D

仉刚洁
2023-03-14

当转换为 Fn 时,异步函数的返回类型被包装在 Future 中,而不是固定的未来,因为您只需要将其固定即可开始轮询。从一开始就创建固定的未来将使从多个异步函数构建组合期货的过程效率降低且更加复杂。所以正确的类型是酒吧类型WSMethod

因此,您需要将类型更改为Box

 类似资料:
  • 问题内容: 我尝试了以下失败的尝试: 在函数a中,我可以使用arguments关键字来访问参数数组,而在函数b中,这些参数将丢失。有没有办法像我尝试的那样将参数传递给另一个javascript函数? 问题答案: 用于对in函数具有相同的访问权,如下所示:

  • 问题内容: 是否可以通过某种方式将一个函数的范围传递给另一个函数? 例如, 我宁愿直接访问变量,即,不使用类似或的任何东西,而只是直接使用或。 问题答案: 真正访问函数私有范围的唯一方法是在函数内部声明,这样就形成了一个闭包,允许隐式访问变量。 这是您的一些选择。 直接访问 在内部声明。 如果您不想在中使用,则可以将它们都放在更大的容器范围内: 这些是您可以直接使用的变量而无需任何额外的代码来移动

  • 问题内容: 我想将传递给function()的所有参数作为参数传递给内部的另一个function(), 这可以在被调用过程中完成并将它们传递给,但是还有另一种方法吗? 本来 但是如果我的func1签名是 我如何将它们全部发送到func2,而不使用 有没有一种方法像在javascript中? 问题答案: 显式比隐式更好, 但是如果您真的不想键入一些字符,请执行以下操作: 都是局部变量,因此您不能在调

  • 问题内容: 我不理解以下示例,可以说我具有以下功能: 为什么#1是正确的解决方案,而#2是错误的解决方案?基本上是一个字典,所以如果我想将参数传递给openX,我认为正确的方法是没有并给出字典。但是python显然不喜欢第二个,并告诉我我给了3个而不是2个参数。那么,这背后的原因是什么? 问题答案: 在第二个示例中,您提供3个参数:文件名,模式和字典()。但是Python期望:2个形式参数和关键字

  • 问题内容: 使用异步模块时,如何将参数从上一个回调传递到下一个回调? 这是来自github上的文档的示例 问题答案: 您可以将异步函数与异步模块的函数链接在一起。这使您可以说:“先执行x,然后将结果传递给函数y,然后将结果传递给z”。从[docs] [1]复制: 您严格不需要异步模块来完成此任务;此功能旨在使代码更易于阅读。如果您不想使用异步模块,则始终可以只使用传统的回调。

  • 所以,我试图用discord js和sqlite3做一件事,但在过去的几个小时里,我一直在讨论这个问题,我仍然不知道如何解决它。 因此,我试图获取仅在中可用的数据,但我不知道如何将其传递出去,以便返回预期值。我已经尝试过使用全局变量,把它放在更大的范围内,但我仍然不知道如何做到这一点。 对不起,如果我弄糊涂了,我不习惯一直问问题。 编辑:应该说我正在使用NPMSqlite3模块。https://w