当前位置: 首页 > 知识库问答 >
问题:

AKKA HTTP源流与未来

孙俊彦
2023-03-14

我的服务代码如下所示,

def getShards(id: String, shards: Int) = {
  def getShardsInternal(shardNo: Int, shards: Future[Array[Byte]]): Future[Array[Byte]] = {
    if (shardNo == 0) shards
    else getShardsInternal(shardNo - 1, shards.flatMap(x => Database.ShardModel.find(id, shardNo)))
  }
  getShardsInternal(shards, Future.successful(Array()))
}

在我的AKKA HTTP路由中,我尝试从返回的未来构建,如下所示,

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        val f = mediaService.getMetadata(id).flatMap { x =>
          mediaService.getShards(id, x.shards)
        }
        Source.fromFuture(f)
      }
    }
  }
}

我不确定source.fromfuture如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端

get {
  pathPrefix("asset") {
    parameters('id) { id =>
      complete {
        Source.fromFuture {
          Future.successful("Hello".getBytes()).flatMap(x => Future.successful("World".getBytes()))
        }
      }
    }
  }
}
[72,101,108,108,111,32,87,111,114,108,100]

然而,我只得到最后一个未来的结果如下,

[[87,111,114,108,100]]

亲切地问候Meeraj

共有1个答案

佴飞驰
2023-03-14

源[Array[Byte],NotUsed]转换为源[ByteString,NotUsed],并将HttpEntityContentTypes:

import akka.util.ByteString

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      val f = mediaService.getMetadata(id).flatMap { x =>
        mediaService.getShards(id, x.shards)
      }
      val source = Source.fromFuture(f).map(ByteString.apply)
      complete(HttpEntity(ContentTypes.`application/octet-stream`, source))
    }
  }
}

这里我使用application/octet-stream作为示例。由于您正在对视频进行流式传输,因此可能需要使用ContentType.Binary以及适当的媒体类型。例如:

complete(HttpEntity(ContentType.Binary(MediaTypes.`video/mpeg`), source))    

处理您的注释和更新时,您似乎想要在getshards中连接未来的结果:正如您所发现的,flatmap不会这样做。改用future.reduceLeft:

def getShards(id: String, shards: Int): Future[Array[Byte]] = {
  val futures = (1 to shards).map(Database.ShardModel.find(id, _))
  Future.reduceLeft(futures)(_ ++ _)
}
def getShards(id: String, shards: Int): Future[List[Array[Byte]]] = {
  val futures = (1 to shards).map(Database.ShardModel.find(id, _)).toList
  Future.sequence(futures)
}

def getAsset = get {
  pathPrefix("asset") {
    parameters('id) { id =>
      val f = mediaService.getMetadata(id).flatMap { x =>
        mediaService.getShards(id, x.shards)
      }
      val source =
        Source.fromFuture(f)
              .flatMapConcat(Source.apply)
              .map(ByteString.apply)
      complete(HttpEntity(/* a content type */, source))
    }
  }
}
 类似资料:
  • 我在Flink中构建了一个工作流,它由一个自定义源、一系列地图/平面地图和一个接收器组成。 我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。 然后,我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将其打印到文件中。在Flink的Web UI中生成的执行图如下所示: 我有一

  • 我正在遵循入门指南[1],但是我已经从配置设置中删除了MySQL和analytics的内容,因为我不打算使用任何分析函数。但是,scdf服务后来崩溃了,因为没有配置数据源。 好的,所以似乎仍然需要在scdf-config-kafka.yml[2]中配置数据源(尽管从阅读文档来看,我认为它只用于分析内容)。 但为了什么?数据源用于持久化Kafka消息,还是在节点之间建立云流消息? 我找不到任何关于大

  • 我使用Spring Webflux、Spring Data和Project Reactor实现非阻塞I/O(Spring Boot 2.0.0.M7)。 我的目标是创建一个类似股票报价器的API,允许客户机根据某些标准从endpoint请求所有资源,并接收在初始请求后创建的新资源。Reactive MongoDB是支持存储。基本的实现如下所示。 显然,这只是返回当前可用的所有,然后关闭连接,并且不

  • 问题内容: 我有一个具有以下依赖关系的层次结构: 里面还有其他依赖项,但我不会走那么远,因为看来我的错误发生的时间早得多。与该依赖关系图相对应,我具有以下Spring配置: 在应用程序的顶层,我将运行: 在进行调试时,我发现初始化成功。。。有些。在完全解析并注入之前,我得到以下信息: 我一直在追溯它,这是我的堆栈跟踪中唯一的类。 删除Java和XML中的所有三个构造函数参数都可以解决此问题,但是引

  • 我目前有一个简单的TextMessage Source,它向我的Websocket客户端流发送消息,如下所示: 因此,我目前有一个Source类型的源代码[TextMessage.Strict,NotUsed],但我想使用注释掉的代码,其中我有一个ActorRef作为我的源代码。 我试过这个: 因此,当我使用ActorRef作为我的源时,我很难尝试将其放入图中。我得到了这个编译时错误: 类型不匹配

  • 需要使用 Jax-Rs Jersey-1.x 资源提供文件上传功能。我想知道是否最好使用“File”作为参数或“InputStream”创建方法? 资源将接受内容类型,并且不绑定到表单数据。它将需要处理小到大(最多千兆)的文件。 想知道使用<code>InputStream