当前位置: 首页 > 知识库问答 >
问题:

Akka-streams-如何访问流的物化值

贝钧
2023-03-14

我正在学习与Akka溪流一起工作,并且真的很喜欢它,但是物化部分对我来说仍然有些神秘。

引用自http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/client-side/host-level.html#host-level-api

...通过对池客户端流具体化到的HostConnectionPool实例调用shutdown()来触发特定池的立即关闭

如何获取HostConnectionPool实例?

更重要的是,我想了解一般情况下如何访问物化值并执行某些操作或从中检索信息。

感谢预先提供的任何文档、指针或解释。

共有1个答案

曹高阳
2023-03-14

这是通过source.viamat函数实现的。从问题中提供的链接扩展代码示例:

import akka.http.scaladsl.Http.HostConnectionPool
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.RunnableGraph

val poolClientFlow = Http().cachedHostConnectionPool[Int]("akka.io")

val graph: RunnableGraph[(HostConnectionPool, Future[(Try[HttpResponse], Int)])]  =
  Source.single(HttpRequest(uri = "/") -> 42)
        .viaMat(poolClientFlow)(Keep.right)
        .toMat(Sink.head)(Keep.both)

val (pool, fut) = graph.run()

pool.shutdown()

由于source.single具体化为unitkeep.right表示保留poolClientFlow具体化为的hostconnectionpool。在.tomat函数中,keep.both表示将左边的池与flow保持距离,将右边的futuresink保持距离。

 类似资料:
  • 我正在尝试返回一个流操作的结果,在本例中为: 求和列表 取值平方 取值平方 其表示为: 若要访问我使用的值,请执行以下操作 紧随其后的是: 这需要一个阻塞调用来允许流完成,这是不可接受的: 如果我使用: 返回错误: 在本例中,收件人执行元是源,并在消息中返回以下内容: Akka流可以用来返回流中的计算值吗?唯一的替代方法是将计算值存储在DB中,或者当在以下位置计算值时将其发送到Kafka主题: ?

  • 我有以下简单的case类层次结构: 我有一个(来自一个基于Websocket的协议,已经有了编解码器)。 我想将此解复用为Foo和Baz类型的单独流,因为它们由完全不同的路径处理。 最简单的方法是什么?应该很明显,但我错过了一些东西。。。

  • 我需要能够从单独的流处理器中删除Ktable中的记录。今天我使用aggregate()并传递一个物化状态存储。在一个从“终止”主题读取的单独处理器中,我想在.transform()或不同的.gaggregate()中查询实体化状态存储,并“移除”该键/值。每次我尝试从一个单独的流处理器访问物化状态时,它都会告诉我存储没有添加到拓扑中,所以我添加它并再次运行它,然后它会告诉我它已经注册,并且出错。

  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 关于如何解决此错误的建议,以便我可以使用最新版本的akka、akka streams和akka HTTP?谢了!