当前位置: 首页 > 知识库问答 >
问题:

如何从递归生成值的流中创建akka流源?

赵禄
2023-03-14

我需要遍历一个形状像树的API。例如,目录结构或讨论线程。它可以通过以下流程进行建模:

type ItemId = Int
type Data = String
case class Item(data: Data, kids: List[ItemId])

def randomData(): Data = scala.util.Random.alphanumeric.take(2).mkString 

// 0 => [1, 9]
// 1 => [10, 19]
// 2 => [20, 29]
// ...
// 9 => [90, 99]
// _ => []
// NB. I don't have access to this function, only the itemFlow.
def nested(id: ItemId): List[ItemId] =
  if (id == 0) (1 to 9).toList
  else if (1 <= id && id <= 9) ((id * 10) to ((id + 1) * 10 - 1)).toList
  else Nil

val itemFlow: Flow[ItemId, Item, NotUsed] = 
  Flow.fromFunction(id => Item(randomData, nested(id)))

如何遍历这些数据?我的工作如下:

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent.Await
import scala.concurrent.duration.Duration

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

val loop = 
  GraphDSL.create() { implicit b =>
    import GraphDSL.Implicits._

    val source = b.add(Flow[Int])
    val merge  = b.add(Merge[Int](2))
    val fetch  = b.add(itemFlow) 
    val bcast  = b.add(Broadcast[Item](2))

    val kids   = b.add(Flow[Item].mapConcat(_.kids))
    val data   = b.add(Flow[Item].map(_.data))

    val buffer = Flow[Int].buffer(100, OverflowStrategy.dropHead)

    source ~> merge ~> fetch           ~> bcast ~> data
              merge <~ buffer <~ kids  <~ bcast

    FlowShape(source.in, data.out)
  }

val flow = Flow.fromGraph(loop)


Await.result(
  Source.single(0).via(flow).runWith(Sink.foreach(println)),
  Duration.Inf
)

system.terminate()

然而,由于我使用的是带有缓冲区的流,所以流永远不会完成。

上游完成且缓冲元件已排空时完成

流缓冲器

我多次阅读了图表周期、活跃度和死锁部分,但仍在努力寻找答案。

这将创建一个活动锁:

import java.util.concurrent.atomic.AtomicInteger

def unfold[S, E](seed: S, flow: Flow[S, E, NotUsed])(loop: E => List[S]): Source[E, NotUsed] = {
  // keep track of how many element flows, 
  val remaining = new AtomicInteger(1) // 1 = seed

  // should be > max loop(x)
  val bufferSize = 10000

  val (ref, publisher) =
    Source.actorRef[S](bufferSize, OverflowStrategy.fail)
      .toMat(Sink.asPublisher(true))(Keep.both)
      .run()

  ref ! seed

  Source.fromPublisher(publisher)
    .via(flow)
    .map{x =>
      loop(x).foreach{ c =>
        remaining.incrementAndGet()
        ref ! c
      }
      x
    }
    .takeWhile(_ => remaining.decrementAndGet > 0)
}

编辑:我添加了一个git repo来测试你的解决方案https://github.com/MasseGuillaume/source-unfold

共有3个答案

赵嘉悦
2023-03-14

啊,阿卡溪流中循环的乐趣。我有一个非常相似的问题,我用一种非常复杂的方式解决了。也许它会对你有所帮助。

黑客解决方案:

  // add a graph stage that will complete successfully if it sees no element within 5 seconds
  val timedStopper = b.add(
    Flow[Item]
      .idleTimeout(5.seconds)
      .recoverWithRetries(1, {
        case _: TimeoutException => Source.empty[Item]
      }))

  source ~> merge ~> fetch ~> timedStopper ~> bcast ~> data
  merge <~ buffer <~ kids <~ bcast

这样做的目的是,在最后一个元素通过timedStopper阶段5秒后,该阶段成功完成流。这是通过使用idleTimeout实现的,它使用TimeoutException使流失败,然后使用recoverWithRetries将失败转化为成功完成。(我确实提到了它是黑客的)。

如果元素之间的间隔时间可能超过5秒,或者在流“实际”完成和Akka接收之间无法等待很长时间,那么这显然是不合适的。谢天谢地,这两个都不是我们所关心的问题,在这种情况下,它实际上运行得很好!

非黑客解决方案

不幸的是,我能想到的唯一不通过超时作弊的方法非常非常复杂。

基本上,你需要能够跟踪两件事:

  • 缓冲区中是否仍有任何元素,或正在发送到缓冲区中

如果且仅当这两个问题的答案都是否定的,则完成流程。本地Akka构建块可能无法处理这个问题。然而,一个定制的图形阶段可能会出现。一个选项可能是编写一个替代合并,让它了解缓冲区内容的方法,或者让它跟踪接收到的ID和广播发送到缓冲区的ID。问题是,在最好的时候,定制的图形阶段写起来并不特别愉快,更不用说在这样的阶段之间混合逻辑了。

警告

Akka流不能很好地处理循环,尤其是它们如何计算完成。因此,这可能不是你遇到的唯一问题。

例如,我们在使用非常相似的结构时遇到的一个问题是,源中的失败被视为流成功完成,并实现了成功的Future。问题是,默认情况下,失败的阶段将失败其下行流,但取消其上行流(这算作这些阶段的成功完成)。对于像您这样的循环,结果是一场竞赛,因为取消向下传播一个分支,但失败向下传播另一个分支。您还需要检查如果接收器出错会发生什么;根据广播的取消设置,取消可能不会向上传播,源将愉快地继续拉入元素。

最后一个选择是:完全避免使用流处理递归逻辑。在一个极端情况下,如果有任何方法可以让您编写一个单尾递归方法,一次提取所有嵌套项并将其放入流阶段,这将解决您的问题。另一方面,我们正在认真考虑去Kafka为我们自己的系统排队。

臧亦
2023-03-14

我通过写自己的GraphStage解决了这个问题。

import akka.NotUsed
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}

import scala.concurrent.ExecutionContext

import scala.collection.mutable
import scala.util.{Success, Failure, Try}

import scala.collection.mutable

def unfoldTree[S, E](seeds: List[S], 
                     flow: Flow[S, E, NotUsed],
                     loop: E => List[S],
                     bufferSize: Int)(implicit ec: ExecutionContext): Source[E, NotUsed] = {
  Source.fromGraph(new UnfoldSource(seeds, flow, loop, bufferSize))
}

object UnfoldSource {
  implicit class MutableQueueExtensions[A](private val self: mutable.Queue[A]) extends AnyVal {
    def dequeueN(n: Int): List[A] = {
      val b = List.newBuilder[A]
      var i = 0
      while (i < n) {
        val e = self.dequeue
        b += e
        i += 1
      }
      b.result()
    }
  }
}

class UnfoldSource[S, E](seeds: List[S],
                         flow: Flow[S, E, NotUsed],
                         loop: E => List[S],
                         bufferSize: Int)(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {

  val out: Outlet[E] = Outlet("UnfoldSource.out")
  override val shape: SourceShape[E] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {  
    // Nodes to expand
    val frontier = mutable.Queue[S]()
    frontier ++= seeds

    // Nodes expanded
    val buffer = mutable.Queue[E]()

    // Using the flow to fetch more data
    var inFlight = false

    // Sink pulled but the buffer was empty
    var downstreamWaiting = false

    def isBufferFull() = buffer.size >= bufferSize

    def fillBuffer(): Unit = {
      val batchSize = Math.min(bufferSize - buffer.size, frontier.size)
      val batch = frontier.dequeueN(batchSize)
      inFlight = true

      val toProcess =
        Source(batch)
          .via(flow)
          .runWith(Sink.seq)(materializer)

      val callback = getAsyncCallback[Try[Seq[E]]]{
        case Failure(ex) => {
          fail(out, ex)
        }
        case Success(es) => {
          val got = es.size
          inFlight = false
          es.foreach{ e =>
            buffer += e
            frontier ++= loop(e)
          }
          if (downstreamWaiting && buffer.nonEmpty) {
            val e = buffer.dequeue
            downstreamWaiting = false
            sendOne(e)
          } else {
            checkCompletion()
          }
          ()
        }
      }

      toProcess.onComplete(callback.invoke)
    }
    override def preStart(): Unit = {
      checkCompletion()
    }

    def checkCompletion(): Unit = {
      if (!inFlight && buffer.isEmpty && frontier.isEmpty) {
        completeStage()
      }
    } 

    def sendOne(e: E): Unit = {
      push(out, e)
      checkCompletion()
    }

    def onPull(): Unit = {
      if (buffer.nonEmpty) {
        sendOne(buffer.dequeue)
      } else {
        downstreamWaiting = true
      }

      if (!isBufferFull && frontier.nonEmpty) {
        fillBuffer()
      }
    }

    setHandler(out, this)
  }
}
杜禄
2023-03-14

未完成的原因

我不认为流永远不会完成的原因是由于“使用带有缓冲区的流”。实际原因,类似于这个问题,是使用默认参数eagerClose=False合并的事实正在等待source缓冲区在它(合并)完成之前完成。但是缓冲区正在等待合并完成。所以合并正在等待缓冲区,缓冲区正在等待合并。

关闭合并

您可以在创建合并时设置ackereclose=True。但不幸的是,使用“急切关闭”可能会导致一些子项ItemId值永远无法被查询。

间接溶液

如果为树的每一层具体化一个新流,那么可以将递归提取到流之外。

您可以使用itemFlow构造查询函数:

val itemQuery : Iterable[ItemId] => Future[Seq[Data]] = 
  (itemIds) => Source.apply(itemIds)
                     .via(itemFlow)
                     .runWith(Sink.seq[Data])

现在可以将此查询函数包装在递归助手函数中:

val recQuery : (Iterable[ItemId], Iterable[Data]) => Future[Seq[Data]] = 
  (itemIds, currentData) => itemQuery(itemIds) flatMap { allNewData =>
      val allNewKids = allNewData.flatMap(_.kids).toSet

      if(allNewKids.isEmpty)
        Future.successful(currentData ++ allNewData)
      else
        recQuery(allNewKids, currentData ++ data)
  }

创建的流的数量将等于树的最大深度。

不幸的是,由于涉及未来,这个递归函数不是尾部递归的,如果树太深,可能会导致“堆栈溢出”。

 类似资料:
  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 在Akka Streams的早期版本中,返回了一个的,可以具体化为。 在Akka Streams 2.4中,我看到返回一个——我不清楚如何使用它。我需要应用于流的转换必须使整个可用,所以我不能只

  • 在Java中,可以使用轻松地生成无限流。但是,我需要生成一个最终完成的流。 想象一下,例如,我想要一个目录中所有文件的流。文件的数量可能很大,因此我无法预先收集所有数据并从中创建流(通过)。我需要一段一段地生成序列。但是流显然会在某个时候完成,而像(或)这样的终端操作符需要对其进行操作,因此在这里不合适。 有没有什么合理的简单方法可以在Java中做到这一点,而不用我自己实现整个流接口呢? 我可以想

  • 我们有以下架构 SQS(源)->SQS轮询器->我们的业务逻辑->Sink,它从SQS中删除消息。 这是一个akka流(我们的业务逻辑有多个阶段)。 现在我们希望通过添加HTTP服务器(而不是Akka HTTP)来扩展这个体系结构。 现在我们的服务也有了路径 我认为https://doc.akka.io/docs/akka/2.5/stream/operators/source/queue.htm

  • 问题内容: 我想使用Java 8递归列出计算机上的所有文件。 Java 8提供了一种返回所有文件和目录但不递归的方法。如何使用它来获取完整的文件递归列表(不使用变异集合)? 我尝试了下面的代码,但仅深入了一层: 而且使用不会编译(不确定原因)… 注意:我对涉及FileVisitors或外部库的解决方案不感兴趣。 问题答案: 通过递归遍历文件系统生成路径路径流的新API是。 如果您真的想递归地生成流

  • 并使用不编译(不确定原因)... 注意:我对涉及FileVisitors或外部库的解决方案不感兴趣。