我想要一个以给定的时间间隔计算函数并发出其输出的源。作为一种变通方法,我可以通过源代码来实现。排队
提供
,但还没有找到更干净的方法。理想情况下,我会有
def myFunction() = .... // function with side-effects
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick
有什么想法吗?
我想,油门
就是你需要的。完全可运行的示例,将Source
应用于iterable,它使用next()
中的函数:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source
import scala.concurrent.duration._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0
def myFunction(): Int = {
i = i + 1
i
}
import scala.collection.immutable.Iterable
val x: Iterable[Int] = new Iterable[Int] {
override def iterator: Iterator[Int] =
new Iterator[Int] {
override def hasNext: Boolean = true
override def next(): Int = myFunction()
}
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)
throttle
参数:节流源,每1秒1个元素,最大突发=1,在发出消息之前暂停,以满足节流速率(这就是成形
)。
最干净的方法可能是使用map
Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())
当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。
这看起来不可思议,但我找不到源代码存储库。主github repo包含一个akka stream dir,但不包含当前的发布源。 目前,我设法通过发布:http://search.maven.org/remotecontent?filepath=com/typesafe/akka/akka-stream-experimental_2.11/2.0.1/akka-stream-experimenta
我的服务代码如下所示, 在我的AKKA HTTP路由中,我尝试从返回的未来构建,如下所示, 我不确定如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端。 然而,我只得到最后一个未来的结果如下, 亲切地问候Meeraj
我有一个Akka Streams,我想根据谓词将其分成两个源。 例如。有一个源(类型被有意简化): 还有两种方法: 我希望能够拆分根据谓词,并将右侧部分传递给方法,左侧部分传递给方法。 我尝试使用拆分器,但它需要s结尾。
因此,一般的想法是使用来爬取页面并执行HTTP请求(想法和实现与本答案中描述的非常接近。这将创建一个,然后可以使用它(解封为多部分,拆分为各个部分,...)。 我现在的问题是的消耗可能需要一段时间(如果页面很大,解组需要一些时间,可能最后会有一些数据库请求来持久化一些数据,...)。因此,如果下游速度较慢,我希望为背压。默认情况下,下一个HTTP请求将在上一个请求完成后立即启动。 所以我的问题是: