我试图将一个流媒体源缓存到磁盘上,同时也将其作为HttpResponse
发送出去,也就是说,我有一个源[ByteString,]
,我想把它交给HttpEntity
,但我还想将相同的数据运行到文件IO中。toPath
sink。
|-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]
似乎我应该使用Broadcast
进行扇出,但从描述来看,它写入两个接收器,而FileIO。toPath
是一个接收器,HttpEntity
需要一个源
。
还有Source.fromGraph
,它看起来像是从GraphStage创建源,例如Broadcast
阶段,但我不太清楚如何将FileIO
接收器放在那里。
您可以使用alsoTo
:
val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)
我有一个Akka Streams,我想根据谓词将其分成两个源。 例如。有一个源(类型被有意简化): 还有两种方法: 我希望能够拆分根据谓词,并将右侧部分传递给方法,左侧部分传递给方法。 我尝试使用拆分器,但它需要s结尾。
在一个HTTP/2的连接中, 流是服务器与客户端之间用于帧交换的一个独立双向序列. 流有几个重要的特点: 一个HTTP/2连接可以包含多个并发的流, 各个端点从多个流中交换frame 流可以被客户端或服务器单方面建立, 使用或共享 流也可以被任意一方关闭 frames在一个流上的发送顺序很重要. 接收方将按照他们的接收顺序处理这些frame. 特别是HEADERS和DATA frame的顺序, 在
我正在使用来自我无法控制的java库的数据发布者。发布者库使用典型的回调设置;在库代码的某个地方(库是java,但我将在scala中描述简洁): 库的用户需要编写一个实现方法的类,并将其传递给,库代码如下所示: 有自己的内部线程我无法控制,以及随附的数据缓冲区,即每当有另一个对象要使用时调用。 所以,我的问题是:如何编写一个层,将原始库模式转换/转换为akka流源对象? 提前谢谢你。
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
我想要一个以给定的时间间隔计算函数并发出其输出的源。作为一种变通方法,我可以通过,但还没有找到更干净的方法。理想情况下,我会有 有什么想法吗?
这看起来不可思议,但我找不到源代码存储库。主github repo包含一个akka stream dir,但不包含当前的发布源。 目前,我设法通过发布:http://search.maven.org/remotecontent?filepath=com/typesafe/akka/akka-stream-experimental_2.11/2.0.1/akka-stream-experimenta