当前位置: 首页 > 知识库问答 >
问题:

Akka流配料

阳凌
2023-03-14

学习Akka Streams。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时变化来将它们批处理为时间组。

实例

case class Record(time: Int, payload: String)

如果传入流是

Record(1, "a")
Record(1, "k")
Record(1, "k")
Record(1, "a")
Record(2, "r")
Record(2, "o")
Record(2, "c")
Record(2, "k")
Record(2, "s")
Record(3, "!")
...

我想把它转换成

Batch(1, Seq("a","k","k","a"))
Batch(2, Seq("r","o","c","k","s"))
Batch(3, Seq("!"))
...

到目前为止,我只发现了按固定数量的记录进行分组,或者拆分成许多子流,但从我的角度来看,我不需要多个子流。

更新:我发现了批处理,但它看起来更关心背压,而不仅仅是一直批处理。

共有1个答案

史修明
2023-03-14

statefulMapConcat是Akka Streams库中的多工具。

val records =
  Source(List(
    Record(1, "a"),
    Record(1, "k"),
    Record(1, "k"),
    Record(1, "a"),
    Record(2, "r"),
    Record(2, "o"),
    Record(2, "c"),
    Record(2, "k"),
    Record(2, "s"),
    Record(3, "!")
  ))
  .concat(Source.single(Record(0, "notused"))) // needed to print the last element

records
  .statefulMapConcat { () =>
    var currentTime = 0
    var payloads: Seq[String] = Nil

    record =>
      if (record.time == currentTime) {
        payloads = payloads :+ record.payload
        Nil
      } else {
        val previousState = (currentTime, payloads)
        currentTime = record.time
        payloads = Seq(record.payload)
        List(previousState)
      }
  }
  .runForeach(println)

运行上述命令将打印以下内容:

(0,List())
(1,List(a, k, k, a))
(2,List(r, o, c, k, s))
(3,List(!))

您可以调整示例以打印Batch对象

 类似资料:
  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 根据文档[1],我一直试图在Akka stream中并行化一个流,但由于某些原因,我没有得到预期的结果。 我遵循了留档中列出的步骤,我不认为我错过了什么。然而,我的流的计算都是按顺序一个接一个地发生的。 我错过了什么? [1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html 示例输出 我希望看到两个计算同时进

  • 我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?

  • 我想利用一个简单的流从http服务收集一些额外的数据,并用这些结果来增强我的数据对象。下面说明了这一想法: 我有一个问题,要理解流的本质和流内部的物化/未来之间的机制和区别。 以下想法并没有向我解释: null

  • 我创建了一个Akka流,它有一个简单的,和。有了这个,我可以很容易地通过它发送元素。现在我想更改这个流,以便返回一个。根据我想更改。 有可能创造出这样的建筑吗?