当前位置: 首页 > 知识库问答 >
问题:

在async Rust(Tokio)中包装阻塞mpsc

宗政鸿志
2023-03-14

我正在尝试使用Tokio包装一个同步MQTT客户机库。代码需要通过std::sync::mpsc通道连续接收消息,并将它们发送到异步代码中。我了解如何使用spawn_blocking包装返回单个值的代码。但是如何将此应用于包装从std::sync::mpsc通道连续接收消息的循环?

let (mut tx, mut rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
            let mut mqtt_options = MqttOptions::new("bot", settings.mqtt.host, settings.mqtt.port);
            let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap();

            mqtt_client.subscribe(settings.mqtt.topic_name, QoS::AtLeastOnce).unwrap();

            tokio::task::spawn_blocking(move || {
                println!("Waiting for notifications");
                for notification in notifications {
                    match notification {
                        rumqtt::Notification::Publish(publish) => {
                            let payload = Arc::try_unwrap(publish.payload).unwrap();
                            let text: String = String::from_utf8(payload).expect("Can't decode payload for notification");
                            println!("Recieved message: {}", text);
                            let msg: Message = serde_json::from_str(&text).expect("Error while deserializing message");
                            println!("Deserialized message: {:?}", msg);
                            println!("{}", msg);
                            tx.send(msg);
                        }
                        _ => println!("{:?}", notification)
                    }
                }
            });
    });
tokio::task::spawn(async move || {
    // How to revieve messages via `rx` here? I can't use tokio::sync::mpsc channels 
    // since the code that sends messages is blocking.
});

共有1个答案

上官扬
2023-03-14

我在rust-lang社区发布了一个单独的帖子,并在那里得到了答案。

std::sync::mpsc::channel可以交换到tokio::sync::mpsc::unbounded_channel,它具有非异步发送方法。它解决了问题。

 类似资料:
  • 我有一个spring-webflux API,它在服务层需要从使用JDBC的现有存储库中读取。 在对这个主题进行了一些阅读之后,我希望将阻塞数据库调用的执行与我的非阻塞异步代码的其余部分分开。 我定义了一个专用的jdbcScheduler: 2)假设我想利用得到的mono,例如调用带有userIdMono结果的API,它将在哪个调度器上执行?是专门为jdbc调用创建的调度器,还是reactor通常

  • 问题内容: 我有一个对象,该对象带有一种方法,希望以类似以下方式向库客户端(尤其是脚本客户端)公开: 但是我可以使用的原始“东西”是一组事件驱动的类: 在其中,ImplementingThing接受输入,执行一些不可思议的工作,例如将任务排队入队列,然后稍后在发生结果时,在可能与调用ImplementingThing.doSomethingAsync()相同的线程上调用该线程。 有没有一种方法可以

  • 我正在尝试创建一个需要执行ssh命令的反应式应用程序。 目前,有一个SSH客户端(基于sshd mina)正在阻塞(也许有一种方法可以以非阻塞的方式使用它,但我不知道)。我的想法是围绕这个阻塞客户端创建一个包装器,这样我就可以像下面的代码一样将阻塞调用转换为Mono。 首先,这样做是不是一个好主意?什么会更好? 第二点是如何编写代码,以便我可以使用前面命令的响应执行一系列ssh命令来执行下一个命令

  • 我只是想知道是否可以使用SocketChannel类(带有ByteBuffer)来模拟Java中常规Socket类的阻塞特性。我做了两个测试项目,一个模拟客户端,另一个模拟服务器: 客户代码: 服务器代码: 正如我在评论中所说,运行服务器然后客户端(按该顺序)的结果是不可预测的,因为有时第二个数字可能保持为4或变为6,而更改分别为-1或4(整数字节)。 至于服务器端,我知道我可以让它等待第二个so

  • 如另一个问题中所述,当使用Undertow时,所有处理都应该在专用的工作线程池中完成,如下所示: 我知道可用于显式地告诉Undertow在专用的线程池中调度请求以阻止请求。我们可以通过将包装在实例中来修改上面的示例,如下所示: 调用此方法将exchange置于阻塞模式,并创建一个BlockingHttpExchange对象来存储流。当交换处于阻塞模式时,输入流方法变得可用,除了阻塞和非阻塞模式之间

  • 我想知道正常的java API调用(我的意思是没有I/O的方法)是否应该被线程化为“迷你阻塞调用”?是否可以像这样实现Reactive Streams(在返回Publisher之前调用方法): 而不是(在流中调用它) 此验证器仅用于示例。这种方法是否有任何缺点,或者这些方法在返回语句之前应该总是包含在流中?