根据文档[1],我一直试图在Akka stream中并行化一个流,但由于某些原因,我没有得到预期的结果。
我遵循了留档中列出的步骤,我不认为我错过了什么。然而,我的流的计算都是按顺序一个接一个地发生的。
我错过了什么?
[1] https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Merge, Source}
object ScalaParallell extends App {
implicit val system = ActorSystem("QuickStart")
def longRunningComputation(x: Int): Int = {
println(s"Computing 1 ${x}")
Thread.sleep(10000)
println(s"Computation 1 ${x} done")
x
}
def longRunningComputation2(x: Int): Int = {
println(s"Computing 2 ${x}")
Thread.sleep(10000)
println(s"Computation 2 ${x} done")
x
}
val processor: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val balance = b.add(Balance[Int](2))
val merge = b.add(Merge[Int](2))
val f = Flow[Int].map(longRunningComputation)
val f2 = Flow[Int].map(longRunningComputation2)
// connect the graph
balance.out(0) ~> f.async ~> merge.in(0)
balance.out(1) ~> f2.async ~> merge.in(1)
// expose ports
FlowShape(balance.in, merge.out)
})
// Wire it all up.
val xs = List(1,2,3)
val source: Source[Int, NotUsed] = Source(xs)
source.via(processor).runForeach(println)
Thread.sleep(5000)
}
示例输出
Computing 2 1
Computation 2 1 done
Computing 2 2
1
Computation 2 2 done
Computing 2 3
2
Computation 2 3 done
3
我希望看到两个计算同时进行。例如:
Computing 1 1
Computing 1 2
Computation 1 2 done
Computing 1 3
Computation 1 1 done
Computing 2 4
1
2
..
尝试删除线程。在
,然后将longRunningComputation
和longRunningComputation2
中睡眠xs
设置为更长的值,例如1到100
,这样你就可以观察并行处理了。不知道为什么,但是阻塞线程。在阿克卡,睡眠肯定被认为是反模式的
学习Akka Streams。我有一个记录流,每个时间单位很多,已经按时间排序(来自Slick),我想通过检测时间步长何时变化来将它们批处理为时间组。 实例 如果传入流是 我想把它转换成 到目前为止,我只发现了按固定数量的记录进行分组,或者拆分成许多子流,但从我的角度来看,我不需要多个子流。 更新:我发现了,但它看起来更关心背压,而不仅仅是一直批处理。
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
我不确定这是否是正确的方法,或者即使这是一个好的方法,或者我是否应该使用一个actor与路由交互,使用ask模式,然后在actor内部流式处理所有内容。 有什么想法吗?
我想利用一个简单的流从http服务收集一些额外的数据,并用这些结果来增强我的数据对象。下面说明了这一想法: 我有一个问题,要理解流的本质和流内部的物化/未来之间的机制和区别。 以下想法并没有向我解释: null
我有一个相当复杂的过程,需要几个层次的嵌套for循环。 只针对一组特定的条件执行操作。换句话说: