我的服务代码如下所示,
def getShards(id: String, shards: Int) = {
def getShardsInternal(shardNo: Int, shards: Future[Array[Byte]]): Future[Array[Byte]] = {
if (shardNo == 0) shards
else getShardsInternal(shardNo - 1, shards.flatMap(x => Database.ShardModel.find(id, shardNo)))
}
getShardsInternal(shards, Future.successful(Array()))
}
在我的AKKA HTTP路由中,我尝试从返回的未来构建源
,如下所示,
def getAsset = get {
pathPrefix("asset") {
parameters('id) { id =>
complete {
val f = mediaService.getMetadata(id).flatMap { x =>
mediaService.getShards(id, x.shards)
}
Source.fromFuture(f)
}
}
}
}
我不确定source.fromfuture
如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端。
get {
pathPrefix("asset") {
parameters('id) { id =>
complete {
Source.fromFuture {
Future.successful("Hello".getBytes()).flatMap(x => Future.successful("World".getBytes()))
}
}
}
}
}
[72,101,108,108,111,32,87,111,114,108,100]
然而,我只得到最后一个未来的结果如下,
[[87,111,114,108,100]]
亲切地问候Meeraj
将源[Array[Byte],NotUsed]
转换为源[ByteString,NotUsed]
,并将HttpEntity
与ContentTypes
:
import akka.util.ByteString
def getAsset = get {
pathPrefix("asset") {
parameters('id) { id =>
val f = mediaService.getMetadata(id).flatMap { x =>
mediaService.getShards(id, x.shards)
}
val source = Source.fromFuture(f).map(ByteString.apply)
complete(HttpEntity(ContentTypes.`application/octet-stream`, source))
}
}
}
这里我使用application/octet-stream
作为示例。由于您正在对视频进行流式传输,因此可能需要使用ContentType.Binary
以及适当的媒体类型。例如:
complete(HttpEntity(ContentType.Binary(MediaTypes.`video/mpeg`), source))
处理您的注释和更新时,您似乎想要在getshards
中连接未来的结果:正如您所发现的,flatmap
不会这样做。改用future.reduceLeft
:
def getShards(id: String, shards: Int): Future[Array[Byte]] = {
val futures = (1 to shards).map(Database.ShardModel.find(id, _))
Future.reduceLeft(futures)(_ ++ _)
}
def getShards(id: String, shards: Int): Future[List[Array[Byte]]] = {
val futures = (1 to shards).map(Database.ShardModel.find(id, _)).toList
Future.sequence(futures)
}
def getAsset = get {
pathPrefix("asset") {
parameters('id) { id =>
val f = mediaService.getMetadata(id).flatMap { x =>
mediaService.getShards(id, x.shards)
}
val source =
Source.fromFuture(f)
.flatMapConcat(Source.apply)
.map(ByteString.apply)
complete(HttpEntity(/* a content type */, source))
}
}
}
我在Flink中构建了一个工作流,它由一个自定义源、一系列地图/平面地图和一个接收器组成。 我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件的名称和内容(我有一个自定义对象,它将此信息存储在两个字段中)。 然后,我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将其打印到文件中。在Flink的Web UI中生成的执行图如下所示: 我有一
我正在遵循入门指南[1],但是我已经从配置设置中删除了MySQL和analytics的内容,因为我不打算使用任何分析函数。但是,scdf服务后来崩溃了,因为没有配置数据源。 好的,所以似乎仍然需要在scdf-config-kafka.yml[2]中配置数据源(尽管从阅读文档来看,我认为它只用于分析内容)。 但为了什么?数据源用于持久化Kafka消息,还是在节点之间建立云流消息? 我找不到任何关于大
我使用Spring Webflux、Spring Data和Project Reactor实现非阻塞I/O(Spring Boot 2.0.0.M7)。 我的目标是创建一个类似股票报价器的API,允许客户机根据某些标准从endpoint请求所有资源,并接收在初始请求后创建的新资源。Reactive MongoDB是支持存储。基本的实现如下所示。 显然,这只是返回当前可用的所有,然后关闭连接,并且不
问题内容: 我有一个具有以下依赖关系的层次结构: 里面还有其他依赖项,但我不会走那么远,因为看来我的错误发生的时间早得多。与该依赖关系图相对应,我具有以下Spring配置: 在应用程序的顶层,我将运行: 在进行调试时,我发现初始化成功。。。有些。在完全解析并注入之前,我得到以下信息: 我一直在追溯它,这是我的堆栈跟踪中唯一的类。 删除Java和XML中的所有三个构造函数参数都可以解决此问题,但是引
我目前有一个简单的TextMessage Source,它向我的Websocket客户端流发送消息,如下所示: 因此,我目前有一个Source类型的源代码[TextMessage.Strict,NotUsed],但我想使用注释掉的代码,其中我有一个ActorRef作为我的源代码。 我试过这个: 因此,当我使用ActorRef作为我的源时,我很难尝试将其放入图中。我得到了这个编译时错误: 类型不匹配
需要使用 Jax-Rs Jersey-1.x 资源提供文件上传功能。我想知道是否最好使用“File”作为参数或“InputStream”创建方法? 资源将接受内容类型,并且不绑定到表单数据。它将需要处理小到大(最多千兆)的文件。 想知道使用<code>InputStream