当前位置: 首页 > 知识库问答 >
问题:

在rust/tokio中合并流

余天宇
2023-03-14

Tokio有一个Merge数据结构,它允许“合并”两个同质流,并忘记出处。

impl<T, U> Stream for Merge<T, U> where
    T: Stream,
    U: Stream<Item = T::Item>, { ...

是否存在一个流的代数逐点标记并,它从a的流和B的流生成a的流?

共有1个答案

公瑞
2023-03-14

我不认为它是作为tokio中的一种方法直接提供的,而是您自己非常简单地将其拼凑在一起。在Rust standard库中没有类型,但是和其他大多数东西一样,有一个板条箱。

use either::Either; // 0.3.7
use tokio::stream::StreamExt as _;

stream1
    .map(Either::Left)
    .merge(stream2.map(Either::Right))
 类似资料:
  • 我将main函数创建为tokio::main。在main函数中,我创建了两个异步任务,一个侦听通道,另一个触发C-API中的消息队列。如果消息可用,我希望通过回调函数上的通道发送它们,这样我就可以接收任务上的消息,在那里我正在监听事件。稍后,我希望通过SSE或GraphQL订阅将这些消息发送给几个客户机。 我不能更改C回调,因为它们需要被C-API通过,我必须使用回调,否则我不会得到消息。 我最新

  • 我正在尝试使用Tokio包装一个同步MQTT客户机库。代码需要通过通道连续接收消息,并将它们发送到异步代码中。我了解如何使用包装返回单个值的代码。但是如何将此应用于包装从通道连续接收消息的循环?

  • 我在Prod运行一个rust Tokio应用程序。在上一个版本中,我有一个bug,一些请求导致我的代码进入无限循环。 发生的情况是,当进入无限循环的任务被卡住时,所有其他任务都继续正常工作并处理请求,直到延迟任务的数量足够高,导致我的程序没有响应。 我的问题是我们的监控系统花了很多时间来识别出哪里出了问题。例如:回答kubernetes健康检查的任务运行良好,但我无法识别系统中的任务是否已停滞。

  • Tokio 是 Rust 中的异步编程框架,它将复杂的异步编程抽象为 Futures、Tasks 和 Executor,并提供了 Timer 等基础设施。Tokio 快速、可靠,且可扩展。 Tokio 是一个事件驱动的非阻塞 I/O 平台,用于使用 Rust 编程语言编写异步应用。在高层设计上,它提供了一些主要组件: 多线程、工作窃取(work-stealing)的 task scheduler

  • 我有以下问题:我试图从函数调用闭包,但是以后必须调用另一个函数。 我无法创建异步闭包,因为它们目前不稳定: 所以我得以某种方式这样做。 我发现了几个与该问题相关的问题,例如,但是当我试图实现它时,我收到了以下错误: 这里是游乐场链接,希望能显示我有什么问题。 我正在使用标题中所述的tokio。

  • 现在,我必须编写一个异步main函数,用标记它,并在其中调用,以便在期间运行一些异步函数。 是否有其他更好的方法允许doc测试像正常测试函数一样运行,例如?另外,如果标记可以显示在文档中,那么用户就可以复制文档并直接将其用作项目中的测试。(这可能可以像那样实现?)