当前位置: 首页 > 知识库问答 >
问题:

Akka未来-需要建议

艾令秋
2023-03-14

我有一个场景,其中我得到一个String消息列表,我必须遍历String并调用另一个方法,这是一个长时间运行的过程。然后我必须收集这个长时间运行过程的结果并连接结果并将其发送回用户交互界面。我对Scala中的这些未来概念很陌生。我正在使用Play框架,其中字符串列表将来自用户交互界面。这是我第一次尝试实现ht场景的样子:

def futuresTest(strList: List[String]) = Action {
  Async {

    val ftrList: List[Future[String]] =
      strList.map(s => Akka.future {longRunningCall(s)} ).toList

    val futureResultList = Future.sequence(ftrList)

    val jsonResponse: String =
      futureResultList.map(_.sum).asInstanceOf[String]

    Akka.future { Ok(jsonResponse) }
  }
}

为简单起见,long RunningCall将只返回一个字符串。稍后我将把它与原始实现联系起来。

def longRunningCall(s: String) = "test"

我的问题是,在声明中:

val ftrList: List[Future[String]] =
  strList.map(s => Akka.future {longRunningCall(s)} ).toList

我假设ftrList将被异步填充,当它遇到下面一行时,我保证futureResultList将包含所有元素(即strList和futureResultList大小相等?

val futureResultList = Future.sequence(ftrList)

请指教!

共有3个答案

隆璞
2023-03-14

在我看来,对于您想要的东西,在这里只使用Scala的并行集合而不是Futures会简单得多。通过使用.par.map,您可以并行地对列表中的每个项目执行长时间运行的操作,并将结果收集在一起。

def getResponse(strList: List[String]) = Action {

    val results: List[String] = strList.par.map(longRunningCall(_))
    val jsonResponse: String = results.mkString(",") //concatenate using ','

    Ok(jsonResponse)
}

http://docs.scala-lang.org/overviews/parallel-collections/configuration.html

阎辰钊
2023-03-14

只要longRunningCall正确返回值,您所做的工作就会正确。ftrList大小将等于strList大小,除非longRunningCall出现异常

魏毅
2023-03-14

我将在这里假设您的意思是要连接字符串。首先是对代码的一些评论:

> < li>

不需要将整个块包装在异步中,只需返回最终的未来。

值类型都可以推断,您不需要显式地声明它们

映射列表将返回列表。对结果调用 toList 是多余的。

List#sum仅适用于数字,请使用fold↓代替字符串。

Future[String]不能通过asInstanceOf直接转换为String。无论如何,您将以Future的形式返回它。

mapFuture.sequence 可以组合成一个 Future.traverse 操作。

并更改代码以应用这些点:

def futuresTest(strList: List[String]) = Action {

  val ftrList = Future.traverse(strList) {
    s => Future( longRunningCall(s) )
  }

  // this will be a Future[String]
  val jsonResponse = ftrList map { _.foldLeft("")(_ + _) }

  Async { jsonResponse map (OK(_)) }
}

最后两行也可以合并:

  Async {
    ftrList map { xs => OK(xs.foldLeft("")(_ + _)) }
  }

更新

使用Future也是一样的。折叠,正如维克多所建议的

def futuresTest(strList: List[String]) = Action {     
  val ftrList = strList map { longRunningCall(_) } // List[Future]
  val jsonResponse = Future.fold(ftrList)("")(_ + _) // Future[String]
  Async { jsonResponse map (OK(_)) }
}

为了处理故障,您希望恢复未来并让它返回不同的响应:

  Async { jsonResponse map (OK(_)) recover (InternalServerError(_.toString)) }

如果您想处理每个元素的单个错误,那么您应该看看这个答案中使用的技术。

 类似资料:
  • 我有两个演员。每个参与者都位于不同的ActorSystem中。第一个缓存第二个ActorRef。第一个演员: 并向第二个参与者发送消息,第二个参与者使用 问题:从第一个演员到第二个演员的最初讲述()有时需要1-3分钟(!)传递信息。 除了这个消息之外,没有其他消息在Akka中发送,这意味着邮箱是空的-系统正在为单个请求提供服务。 系统详情: 该应用程序有500个计划的参与者,他们每30秒(阻塞)轮

  • 我的服务代码如下所示, 在我的AKKA HTTP路由中,我尝试从返回的未来构建,如下所示, 我不确定如何提交给响应。被传递的未来实质上是一系列预期按顺序执行的平面映射的未来。但是,我不相信这会作为分块字节流返回到客户端。 然而,我只得到最后一个未来的结果如下, 亲切地问候Meeraj

  • 相关:java.util.concurrent.Future的scala.concurrent.Future包装器 这来自我的另一个问题: 如何将akka streams Kafka(reactive-Kafka)集成到akka http应用中? 我有一个AKKA HTTP应用程序,我想在路由中的onComplete函数中向Kafka发送消息/产品记录,如下所示: 但是,on complete(p

  • 我用的是Scala 2.10,Akka 2.1和Play 2.1。当我向后端发送一个http请求时,我要求一个参与者计算一些东西。如果计算结果在超时之前返回,则返回计算结果,否则返回另一个字符串。请参阅下面的代码。 演员如下: 我的问题是,即使演员在超时之前完成,未来也不会“返回”任何内容,因此超时总是过期。我做错了什么?谢谢。

  • 假设接收处理程序调用一个返回future的操作,我们使用dispatcher作为上下文执行器来映射它,最后我们设置一个回调来改变执行元状态。 从回调中更改执行元的状态,甚至使用执行元调度程序作为执行上下文,这是线程安全的吗?

  • 我在写一个小服务器。一篇博文特别推荐了这种提问模式。课程具有以下特点: 我正在检索这个数据库行,如果它不存在或发生了一些错误,我想从Spray发送一个响应来告诉客户端。 然而,使用这种模式,我不知道在哪里注入开关。 我试着把代码改成这样: Spray使用发送HTTP响应可以接受两个对象,也可以接受一个字符串/可序列化对象。我想使用双对象模式(它允许我手动编码它的标题),理想的情况应该是 有没有办法