当前位置: 首页 > 知识库问答 >
问题:

如何从Source[A]创建Akka流源[Seq[A]]

薛博赡
2023-03-14

在Akka Streams的早期版本中,groupBy返回了一个SourceSource,可以具体化为源[Seq[a]]

在Akka Streams 2.4中,我看到groupBy返回一个子流——我不清楚如何使用它。我需要应用于流的转换必须使整个Seq可用,所以我不能只映射子流(我认为)。

我已经编写了一个扩展GraphStage的类,该类通过GraphStageLogic中的可变集合进行聚合,但是否有内置功能?我是否错过了SubFlow的要点?


共有1个答案

锺离边浩
2023-03-14

最后我写了一个图形舞台

class FlowAggregation[A, B](f: A => B) extends GraphStage[FlowShape[A, Seq[A]]] {
  val in: Inlet[A] = Inlet("in")
  val out: Outlet[Seq[A]] = Outlet("out")
  override val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      private var counter: Option[B] = None
      private var aggregate = scala.collection.mutable.ArrayBuffer.empty[A]

      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val element = grab(in)

          counter.fold({
            counter = Some(f(element))
            aggregate += element
            pull(in)
          }) { p =>
            if (f(element) == p) {
              aggregate += element
              pull(in)
            } else {
              push(out, aggregate)
              counter = Some(f(element))
              aggregate = scala.collection.mutable.ArrayBuffer(element)
            }
          }
        }
        override def onUpstreamFinish(): Unit = {
          emit(out, aggregate)
          complete(out)
        }
      })

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}
 类似资料:
  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 我需要遍历一个形状像树的API。例如,目录结构或讨论线程。它可以通过以下流程进行建模: 如何遍历这些数据?我的工作如下: 然而,由于我使用的是带有缓冲区的流,所以流永远不会完成。 上游完成且缓冲元件已排空时完成 流缓冲器 我多次阅读了图表周期、活跃度和死锁部分,但仍在努力寻找答案。 这将创建一个活动锁: 编辑:我添加了一个git repo来测试你的解决方案https://github.com/Ma

  • 我们有以下架构 SQS(源)->SQS轮询器->我们的业务逻辑->Sink,它从SQS中删除消息。 这是一个akka流(我们的业务逻辑有多个阶段)。 现在我们希望通过添加HTTP服务器(而不是Akka HTTP)来扩展这个体系结构。 现在我们的服务也有了路径 我认为https://doc.akka.io/docs/akka/2.5/stream/operators/source/queue.htm

  • 我正在尝试使用akka流传输一个文件,在将流的结果提取到Future[String]时遇到了一个小问题: 我得到一个编译错误: 任何人都可以帮助我了解我做错了什么,以及我需要做些什么来提取流的结果?

  • 我需要创建一个具有以下接口的函数: 我的问题是,我不知道如何定义符合上述接口的流。 当我做这种事的时候 结果类型为Flow[Item,OtherItem,NotUsed]。到目前为止,我还没有在Akka文档中找到任何东西。还有akka上的功能。流动scaladsl。流只提供“未使用”而不是控制。如果有人能给我指出正确的方向那就太好了。 一些背景:我需要设置几个只在转换部分区分的管道。这些管道是主流

  • 问题内容: 是否可以从源代码修改或创建配置文件。我正在使用远程创建一些客户端/服务器体系结构。我要实现的功能是使用以下命令启动客户端应用程序:主机/端口,并且在尚无配置文件时创建一个可满足命令行args的功能。 配置并不复杂。我想从源端口更改(最终是主机,现在无论如何都为主机测试)以使其自动化,这样我就可以通过将多个客户端传递给主函数来运行多个客户端。 问题答案: 是的,您可以在代码中修改或创建配