当前位置: 首页 > 知识库问答 >
问题:

我怎样才能在锈迹斑斑的溪流上“平展地图”?

干宏邈
2023-03-14

我有一个rusoto_core::ByteStream,它实现了Futures的Stream特征:

let chunks = vec![b"1234".to_vec(), b"5678".to_vec()];
let stream = ByteStream::new(stream::iter_ok(chunks));

我想把它传递给actix_web的httpresponseBuilder::streaming方法。

use actix_web::dev::HttpResponseBuilder; // 0.7.18
use rusoto_core::ByteStream; // 0.36.0

fn example(stream: ByteStream, builder: HttpResponseBuilder) {
    builder.streaming(stream);
}
error[E0271]: type mismatch resolving `<rusoto_core::stream::ByteStream as futures::stream::Stream>::Item == bytes::bytes::Bytes`
 --> src/main.rs:5:13
  |
5 |     builder.streaming(stream);
  |             ^^^^^^^^^ expected struct `std::vec::Vec`, found struct `bytes::bytes::Bytes`
  |
  = note: expected type `std::vec::Vec<u8>`
             found type `bytes::bytes::Bytes`

我认为解决方案是flatmapmybytestream,但我找不到这样的流方法。

下面是如何使用streaming()的示例:

let text = "123";
let (tx, rx_body) = mpsc::unbounded();
let _ = tx.unbounded_send(Bytes::from(text.as_bytes()));

HttpResponse::Ok()
    .streaming(rx_body.map_err(|e| error::ErrorBadRequest("bad request")))

共有1个答案

闻人冷勋
2023-03-14

如何在Rust中FlatMap流?

平面映射将迭代器的迭代器转换为单个迭代器(或流代替迭代器)。

Futures 0.3没有直接的平面映射,但它有Streamext::Flatten,可以在Streamext::map之后使用。

use futures::{stream, Stream, StreamExt}; // 0.3.1

fn into_many(i: i32) -> impl Stream<Item = i32> {
    stream::iter(0..i)
}

fn nested() -> impl Stream<Item = i32> {
    let stream_of_number = into_many(5);
    let stream_of_stream_of_number = stream_of_number.map(into_many);
    let flat_stream_of_number = stream_of_stream_of_number.flatten();

    // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
    flat_stream_of_number
}
use futures::{stream, Stream}; // 0.1.25

fn into_many(i: i32) -> impl Stream<Item = i32, Error = ()> {
    stream::iter_ok(0..i)
}

fn nested() -> impl Stream<Item = i32, Error = ()> {
    let stream_of_number = into_many(5);
    let stream_of_stream_of_number = stream_of_number.map(into_many);
    let flat_stream_of_number = stream_of_stream_of_number.flatten();

    // Returns: 0, 0, 1, 0, 1, 2, 0, 1, 2, 3
    flat_stream_of_number
}

是的,这就是问题所在。通过stream::map使用bytes::from将您的流从一种类型转换为另一种类型:

use bytes::Bytes; // 0.4.11
use futures::Stream; // 0.1.25 

fn example(stream: ByteStream, mut builder: HttpResponseBuilder) {
    builder.streaming(stream.map(Bytes::from));
}
 类似资料:
  • 下面的Rust代码试图在一个数组中存储一个零参数闭包,并调用函数。 链接到 Rust Playground。不幸的是,它不会编译: 错误[E0277]:

  • 我尝试在我的DB中更新一个表“image”并插入blob类型。 ImageClass: Hibernate用户映射: 利布: 错误:

  • 已挂, 查看状态变更为业务面试-未录用 -------------------------------------------------------- 1. 深挖项目,需要对自己的项目和类似开源项目,用到的框架技术有深入了解 2. Redis 3. MySQL 4. 消息队列 5. GC相关 反问 拷打约50分钟 面试官水平很高,也会根据你的回答引导,可惜自己太菜了,很多没回答上来

  • 有什么方法可以简化这段代码吗?我正好有一个白色的一块,想要得到它的位置 代码: 瓦片类: 件类:

  • 我最近开始使用在我的android应用程序上播放流链接,比如和文件,但当我试图播放文件,以下消息开始出现在Android logcat中: 源错误。com.google.android.exoplayer2.source.无法识别的输入格式异常:输入不以#EXTM3U头开始。在com.google.android.exoplayer2.source.hls.playlist.HlsPlaylist

  • 我们使用New Relic从生产环境中收集性能信息,并添加了一些自定义工具。在Web事务屏幕中,我们可以看到哪些事务使用的时间最多,甚至可以深入查看最慢事务的特定跟踪。一切正常。然而,最慢的事务并不总是代表整个操作。它们通常是边缘情况(缓存过期、更新后暖化请求等)。 我很想以更聚合的方式看到我们可以在跟踪详细信息中看到的相同数据。最好也是在跟踪详细信息中使用的分层方式(尽管这并不总是可能的,因为多