当前位置: 首页 > 知识库问答 >
问题:

在Akka Http中从源发送元素到元素

钱锦
2023-03-14

我正在开发一个使用Akka Http和Akka流的客户机-服务器应用程序。主要思想是服务器必须使用来自Akka Streams的源来提供http响应。

问题是服务器在向客户机发送第一条消息之前积累了一些元素。但是,我需要服务器在源生成新元素时立即发送元素到元素。

case class Example(id: Long, txt: String, number: Double)

object MyJsonProtocol extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val exampleFormat = jsonFormat3(Test)
}

class BatchIterator(batchSize: Int, numberOfBatches: Int, pause: FiniteDuration) extends Iterator[Array[Test]]{

  val range = Range(0, batchSize*numberOfBatches).toIterator
  val numberOfBatchesIter = Range(0, numberOfBatches).toIterator

  override def hasNext: Boolean = range.hasNext

  override def next(): Array[Test] = {
    println(s"Sleeping for ${pause.toMillis} ms")
    Thread.sleep(pause.toMillis)
    println(s"Taking $batchSize elements")
    Range(0, batchSize).map{ _ =>
      val count = range.next()
      Test(count, s"Text$count", count*0.5)
    }.toArray
  }
}

object Server extends App {
  import MyJsonProtocol._
  implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json()
      .withFramingRenderer(
        Flow[ByteString].intersperse(ByteString(System.lineSeparator))
      )

  implicit val system = ActorSystem("api")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def fetchExamples(): Source[Array[Test], NotUsed] = Source.fromIterator(() => new BatchIterator(5, 5, 2 seconds))

  val route =
    path("example") {
      complete(fetchExamples)
    }

  val bindingFuture = Http().bindAndHandle(route, "localhost", 9090)
  println("Server started at localhost:9090")
  StdIn.readLine()
  bindingFuture.flatMap(_.unbind()).onComplete(_ ⇒ system.terminate())
}
curl --no-buffer localhost:9090/example

我同时获得所有元素,而不是每2秒接收一个元素。

有什么想法可以“强制”服务器发送每个元素,因为它是从源头出来的吗?

共有1个答案

温亮
2023-03-14

终于,我找到了解决办法。问题是源是同步的...因此解决方案只是调用函数async

complete(fetchExamples.async)
 类似资料:
  • 问题内容: 我正在使用量角器进行有角度的端到端aka 测试。 为了将键发送到元素,我使用: 我如何发送像这样的组合键? 我在github上搜索了量角器源代码,但是没有找到相关的示例。 问题答案: 在Linux和Windows中是完全可能的,但在OSX中是不可能的 还有一个非元素的变体:

  • 我正在尝试简单地登录到这个页面来访问LexisNexis。下面是我的代码: 下面是html源代码:

  • 这是HTML 这是我的代码 < code>WebDriverWait(浏览器,30)。直到(EC . presence _ of _ element _ located((By。CLASS_NAME,' MCE-textbox MCE-ABS-layout-item MCE-last '))。send_text('任何东西') 我尝试了几乎所有的方法(ID、xpath、tag_name、css选择

  • 如何从Akka HTTP路由向Akka Sink发送元素/消息?我的HTTP路由仍然需要返回正常的HTTP响应。 我想这需要一个支流/枢纽。正常的HTTP路由是来自HttpRequest的流- 下面是一个非常简单的单路由akka http应用程序。为了简单起见,我使用了一个简单的println水槽。我的生产用例显然将涉及一个不那么琐碎的水槽。 编辑:或者在使用低级akka http API时,如何

  • 下面我有以下数据。 所以,我不知道为什么UDF可以使用int而不能使用CharArray。此外,我觉得可能有一种方法可以做到这一点,而不使用UDF..但不确定从哪里开始。对这里可能发生的事情有什么建议吗?

  • 问题内容: 我在通过VBA中的Selenium引用网站上的搜索框时遇到了麻烦。该框的HTML代码为: 我努力了 但是它们似乎都不起作用。任何帮助是极大的赞赏。 问题答案: 要在所需元素内发送 字符序列,可以使用以下定位策略之一: 使用: 使用: