当前位置: 首页 > 知识库问答 >
问题:

Tokio::Select中的Rust lazy_static和Tokio::Sync::mpsc::Channel

闻枫
2023-03-14

我将main函数创建为tokio::main。在main函数中,我创建了两个异步任务,一个侦听通道,另一个触发C-API中的消息队列。如果消息可用,我希望通过回调函数上的通道发送它们,这样我就可以接收任务上的消息,在那里我正在监听事件。稍后,我希望通过SSE或GraphQL订阅将这些消息发送给几个客户机。

我不能更改C回调,因为它们需要被C-API通过,我必须使用回调,否则我不会得到消息。

我最新的方法看起来很简单:

use lazy_static::lazy_static;
use tokio::sync::{
    mpsc::{channel, Receiver, Sender},
    Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};

lazy_static! {
    static ref BROADCAST_CONNECT: Mutex<(Sender<bool>, Receiver<bool>)> = Mutex::new(channel(128));
    static ref BROADCAST_CONNECTIONSTATE: Mutex<(Sender<u32>, Receiver<u32>)> = Mutex::new(channel(128));
}

#[tokio::main]
async fn main() {    
    unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
    unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API

    tokio::spawn(async move { // wait for a channel to have a message
        loop {
            tokio::select! {
                // wating for a channel to receive a message
                Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
                Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
            }
        }
    });

    let handle2 = tokio::spawn(async move {
        loop {
            unsafe {
                message_queue_in_c(
                    some_handle,
                    true,
                    Duration::milliseconds(100).num_microseconds().unwrap(),
                )
            }
        }
    });

    handle.await.unwrap();
    habdle2.await.unwrap();
}

// the callback function that gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
    // C-API is not async, so use synchronous lock
    match BROADCAST_CONNECT.try_lock() {
        Ok(value) => match value.0.blocking_send(is_connected) {
            Ok(_) => {}
            Err(e) => {
                eprintln!("{}", e)
            }
        },
        Err(e) => {
            eprintln!("{}", e)
        }
    }
}

unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
    match BROADCAST_CONNECTIONSTATE.try_lock() {
        Ok(value) => match value.0.blocking_send(connectionstate) {
            Ok(_) => {}
            Err(e) => {
                eprintln!("{}", e)
            }
        },
        Err(e) => {
            eprintln!("{}", e)
        }
    }
}
error[E0716]: temporary value dropped while borrowed
  --> src/main.rs:37:29
   |
35 | /             tokio::select! {
36 | |                 Some(msg) = BROADCAST_CONNECT.lock().await.1.recv() => println!("{}", msg),
37 | |                 Some(msg) = BROADCAST_CONNECTIONSTATE.lock().await.1.recv() => println!("{}", msg),
   | |                             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ creates a temporary which is freed while still in use
38 | |             }
   | |             -
   | |             |
   | |_____________temporary value is freed at the end of this statement
   |               borrow later captured here by closure
   |
   = note: consider using a `let` binding to create a longer lived value
use lazy_static::lazy_static;
use crossbeam::{
    channel::{bounded, Receiver, Sender},
    select,
};
use bindgen::{notify_connect, notify_connectionstate};

lazy_static! {
    static ref BROADCAST_CONNECT: (Sender<bool>, Receiver<bool>) = bounded(128);
    static ref BROADCAST_CONNECTIONSTATE: (Sender<u32>, Receiver<u32>) = bounded(128);
}

#[tokio::main]
async fn main() {    
    unsafe { notify_connect(Some(_notify_connect)) } // pass the callback function to the C-API
    unsafe { notify_connectionstate(Some(_notify_connectionstate)) } // pass the callback function to the C-API

    let handle1 = tokio::spawn(async move {
        loop {
            select! {
                recv(&BROADCAST_CONNECT.1) -> msg => println!("is_connected: {:?}", msg.unwrap()),
                recv(&BROADCAST_CONNECTIONSTATE.1) -> msg => println!("connectionstate: {:?}", msg.unwrap()),
            }
        }
    });

    let handle2 = tokio::spawn(async move {
        loop {
            unsafe {
                message_queue_in_c(
                    some_handle,
                    true,
                    Duration::milliseconds(100).num_microseconds().unwrap(),
                )
            }
        }
    });

    handle.await.unwrap();
    handle2.await.unwrap();
}

// the callback function thats gets called from the C-API
unsafe extern "C" fn _notify_connect(is_connected: bool) {
    match &BROADCAST_CONNECT.0.send(is_connected) {
        Ok(_) => {}
        Err(e) => eprintln!("{}", e),
    };
}

unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
    match BROADCAST_CONNECTIONSTATE.0.send(connectionstate) {
        Ok(_) => {}
        Err(e) => eprintln!("{}", e),
    }
}
use tokio::sync::{
    mpsc::{channel, Receiver, Sender},
    Mutex,
};
use bindgen::{notify_connect, notify_connectionstate};

#[tokio::main]
async fn main() {
    let app = app::App::new();

    let mut broadcast_connect = channel::<bool>(128);
    let mut broadcast_connectionstate = channel::<bool>(128);

    let notify_connect = {
        unsafe extern "C" fn _notify_connect(is_connected: bool) {
            match broadcast_connect.0.blocking_send(is_connected) {
                Ok(_) => {}
                Err(e) => {
                    eprintln!("{}", e)
                }
            }
        }
    };

    let notify_connectionstate = {
        unsafe extern "C" fn _notify_connectionstate(connectionstate: u32) {
            match broadcast_connectionstate.0.blocking_send(connectionstate) {
                Ok(_) => {}
                Err(e) => {
                    eprintln!("{}", e)
                }
            }
        }
    };

    unsafe { notify_connect(Some(notify_connect)) } // pass the callback function to the C-API
    unsafe { notify_connectionstate(Some(notify_connectionstate)) } // pass the callback function to the C-API

    let handle = tokio::spawn(async move {
        loop {
            tokio::select! {
                Some(msg) = broadcast_connect.1.recv() => println!("{}", msg),
                Some(msg) = broadcast_connectionstate.1.recv() => println!("{}", msg),
            }
        }
    });

    let handle2 = tokio::spawn(async move {
        loop {
            unsafe {
                message_queue_in_c(
                    some_handle,
                    true,
                    Duration::milliseconds(100).num_microseconds().unwrap(),
                )
            }
        }
    });

    handle.await.unwrap();
    handle2.await.unwrap();
}
can't capture dynamic environment in a fn item
  --> src/main.rs:47:19
   |
47 |             match broadcast_connectionstate.0.blocking_send(connectionstate) {
   |                   ^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = help: use the `|| { ... }` closure form instead

已经谢谢你了,一直读到这里。

共有1个答案

田兴怀
2023-03-14

如果对第一个示例进行以下更改,它应该可以工作:

  1. std::sync::mutex替换tokio::sync::mutex,这样就不必在回调中使用try_lock
  2. 不要将接收方存储在互斥体中,只存储发送方。
  3. 在回调中,要么使用无界通道,要么确保在发送之前释放锁。
  4. 在使用std::thread::spawn而不是在tokio::spawn的专用线程上运行阻塞C代码。(为什么?)

若要不将接收方存储在互斥体中,可以执行以下操作:

static ref BROADCAST_CONNECT: Mutex<Option<Sender<bool>>> = Mutex::new(None);

// in main
let (send, recv) = channel(128);
*BROADCAST_CONNECT.lock().unwrap() = Some(send);
// in C callback
let lock = BROADCAST_CONNECT.lock().unwrap();
let send = lock.as_ref().clone();
drop(lock);
send.blocking_send(...);
 类似资料:
  • 我正在尝试使用Tokio包装一个同步MQTT客户机库。代码需要通过通道连续接收消息,并将它们发送到异步代码中。我了解如何使用包装返回单个值的代码。但是如何将此应用于包装从通道连续接收消息的循环?

  • Tokio 是 Rust 中的异步编程框架,它将复杂的异步编程抽象为 Futures、Tasks 和 Executor,并提供了 Timer 等基础设施。Tokio 快速、可靠,且可扩展。 Tokio 是一个事件驱动的非阻塞 I/O 平台,用于使用 Rust 编程语言编写异步应用。在高层设计上,它提供了一些主要组件: 多线程、工作窃取(work-stealing)的 task scheduler

  • axum 是一个使用了 Tokio、Tower 和 Hyper,并专注于模块化的 Web 应用程序框架。 其高级功能包括: 用一个无宏的 API 将请求路由到处理程序。 使用提取器对请求进行声明式的解析。 简单且可预测的错误处理模型。 用最小的模板生成响应。 充分利用 tower 和 tower-http 的中间件、服务和工具的生态系统。 特别是最后一点,是 axum 与其他框架不同的地方。axu

  • 有一个Merge数据结构,它允许“合并”两个同质流,并忘记出处。 是否存在一个流的代数逐点标记并,它从的流和的流生成的流?

  • 我在Prod运行一个rust Tokio应用程序。在上一个版本中,我有一个bug,一些请求导致我的代码进入无限循环。 发生的情况是,当进入无限循环的任务被卡住时,所有其他任务都继续正常工作并处理请求,直到延迟任务的数量足够高,导致我的程序没有响应。 我的问题是我们的监控系统花了很多时间来识别出哪里出了问题。例如:回答kubernetes健康检查的任务运行良好,但我无法识别系统中的任务是否已停滞。

  • 我正在用JNI异步执行测试Rust。我想在Rust中执行请求,并用回调将结果异步返回到Android。我正在测试代码来执行命令行中的请求,它工作得很好。 这就是它在命令行上的工作方式: null null 我正在检查它是如何工作的,但示例似乎已经过时:*https://github.com/mozilla/rust-android-gradle/blob/master/samples/rust/s