当前位置: 首页 > 知识库问答 >
问题:

Akka流,源于函数?

左丘昕
2023-03-14

我想要一个以给定的时间间隔计算函数并发出其输出的源。作为一种变通方法,我可以通过代码来实现。排队提供,但还没有找到更干净的方法。理想情况下,我会有

def myFunction() = ....                     // function with side-effects 
Source.tick(1.second, 1.second, myFunction) // myFunction is evaluated at every tick

有什么想法吗?

共有2个答案

孙岳
2023-03-14

我想,油门就是你需要的。完全可运行的示例,将Source应用于iterable,它使用next()中的函数

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ThrottleMode.Shaping
import akka.stream.scaladsl.Source

import scala.concurrent.duration._

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
var i = 0

def myFunction(): Int = {
  i = i + 1
  i
}

import scala.collection.immutable.Iterable

val x: Iterable[Int] = new Iterable[Int] {
  override def iterator: Iterator[Int] =
    new Iterator[Int] {
      override def hasNext: Boolean = true

      override def next(): Int = myFunction()
    }
}
Source(x).throttle(1, 1.second, 1, Shaping).runForeach(println)

throttle参数:节流源,每1秒1个元素,最大突发=1,在发出消息之前暂停,以满足节流速率(这就是成形)。

燕鸿文
2023-03-14

最干净的方法可能是使用map

Source.tick(1.second, 1.second, NotUsed).map(_ ⇒ myFunction())
 类似资料:
  • 当接收到消息时,它将运行,并将接收到的每个项发布到。 我怎么能那么做? 以防万一它可能会添加更多选项,请注意,另一个代码块是的Websocket处理程序。

  • 这看起来不可思议,但我找不到源代码存储库。主github repo包含一个akka stream dir,但不包含当前的发布源。 目前,我设法通过发布:http://search.maven.org/remotecontent?filepath=com/typesafe/akka/akka-stream-experimental_2.11/2.0.1/akka-stream-experimenta

  • 我的服务代码如下所示, 在我的AKKA HTTP路由中,我尝试从返回的未来构建,如下所示, 我不确定如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端。 然而,我只得到最后一个未来的结果如下, 亲切地问候Meeraj

  • 我有一个Akka Streams,我想根据谓词将其分成两个源。 例如。有一个源(类型被有意简化): 还有两种方法: 我希望能够拆分根据谓词,并将右侧部分传递给方法,左侧部分传递给方法。 我尝试使用拆分器,但它需要s结尾。

  • 因此,一般的想法是使用来爬取页面并执行HTTP请求(想法和实现与本答案中描述的非常接近。这将创建一个,然后可以使用它(解封为多部分,拆分为各个部分,...)。 我现在的问题是的消耗可能需要一段时间(如果页面很大,解组需要一些时间,可能最后会有一些数据库请求来持久化一些数据,...)。因此,如果下游速度较慢,我希望为背压。默认情况下,下一个HTTP请求将在上一个请求完成后立即启动。 所以我的问题是: