当前位置: 首页 > 知识库问答 >
问题:

Akka Streams:在GraphStage源代码中处理未来

戚研
2023-03-14

我试图构建一个Akka流源,它通过发出futureAPI调用来接收数据(API的本质是滚动,它增量地获取结果)。为了构建这样的源码,我使用了GraphStage。

我修改了NumberSource示例,它只是一次推送一个int。我所做的唯一更改是将Int替换为getvalue():future[Int](以模拟API调用):

class NumbersSource extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  // simple example of future API call
  private def getvalue(): Future[Int] = Future.successful(Random.nextInt())


  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
            // Future API call
            getvalue().onComplete{
              case Success(value) =>
                println("Pushing value received..") // this is currently being printed just once
                push(out, counter)
              case Failure(exception) =>
            }
          }
        }
      })
    }
}

// Using the Source and Running the stream

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
  val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val done: Future[Done] = mySource.runForeach{
    num => println(s"Received: $num") // This is currently not printed
  }
  done.onComplete(_ => system.terminate())

上面的代码不起作用。sethandler中的println语句只执行一次,不向下游推送任何内容。

这类未来来电应如何处理?谢了。

class NumbersSource(futureNum: Future[Int]) extends GraphStage[SourceShape[Int]] {
  val out: Outlet[Int] = Outlet("NumbersSource")
  override val shape: SourceShape[Int] = SourceShape(out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {

      override def preStart(): Unit = {
        val callback = getAsyncCallback[Int] { (_) =>
          completeStage()
        }
        futureNum.foreach(callback.invoke)
      }

      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          val value: Int = ??? // How to get this value ??
          push(out, value)
        }
      })
    }
}

// Using the Source and Running the Stream

def random(): Future[Int] = Future.successful(Random.nextInt())

  val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource(random())
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)

  val done: Future[Done] = mySource.runForeach{
    num => println(s"Received: $num") // This is currently not printed
  }
  done.onComplete(_ => system.terminate())
val value = grab(in) // where in is Inlet of a Flow

共有1个答案

梁俊友
2023-03-14

我不确定我的理解是否正确,但是如果您试图实现一个无限的源代码,而不是在Futures中计算的元素,那么实际上没有必要使用您自己的GraphStage来实现这个源代码。您可以简单地按以下方式进行:

Source.repeat(())
    .mapAsync(parallelism) { _ => Future.successful(Random.nextInt()) }

source.repeat(())只是一些任意值的无限源(在本例中为unit类型,但是您可以将()更改为您想要的任何值,因为这里忽略了它)。然后使用MapAsync将异步计算集成到流中。

 类似资料:
  • 在你的代码合并,压缩或编译后,保持客户端代码可读性和可调试性。使用Source Maps(源码映射)将源代码映射到已编译的代码。 TL;DR 使用Source Maps(源码映射)将压缩代码映射到源代码。然后,您可以在其原始源代码中读取和调试编译的代码。 仅使用能够生成Source Maps(源码映射)的预处理器。 验证您的web 服务器是否可以为Source Maps(源码映射)提供服务。 开始

  • 假设我试图从使用基本身份验证/基本证书的RESTful api中提取,那么在我的程序中存储用户名和密码的最佳方式是什么?现在它只是以明文形式存在。 有没有更安全的方法? 谢啦

  • 我正在使用带有thymeleaf的Spring Boot,我所有的资源都在Spring应用程序之外的路径上,例如。在dev env上应该使用url解析路径,并且live env继续路径。 为什么资源处理程序不处理这些类型的资源,但是如果我处理没有问题?我错过了什么吗? 编辑:如果是并且位置是url也没有被处理

  • 问题内容: 我目前在一个项目中,其中的代码中有大约3000行的SQL字符串。 该项目是一个Java项目,但是这个问题可能适用于任何语言。 无论如何,这是我第一次见到如此糟糕的东西。该代码库是遗留的,因此我们可以突然迁移到Hibernate或类似的东西。 您该如何处理非常大的SQL字符串? 我知道这很不好,但是我不知道建议什么是最好的解决方案。 问题答案: 在我看来,将那些硬编码的值放入存储过程中,

  • 问题内容: 假设我试图从使用基本身份验证/基本证书的RESTful API中提取信息,那么在该程序中存储该用户名和密码的最佳方法是什么?现在,它只是以纯文本格式坐在那里。 有什么方法可以做到更注重安全性吗? 问题答案: 在@ Damien.Bell的请求下,以下示例涵盖了第一步和第二步: 一个解决每个保护步骤的完整示例将远远超出我认为对该问题的合理范围,因为它是关于“步骤是什么”而不是“如何应用它

  • 我试图在一个项目上执行一些PHP代码(使用Dreamweaver),但代码没有运行。 当我检查源代码时,PHP代码显示为HTML标记(我可以在源代码中看到)。Apache运行正常(我正在使用XAMPP),PHP页面打开正常,但PHP代码没有执行。 有人对正在发生的事情有什么建议吗? 编辑:代码..: