当前位置: 首页 > 知识库问答 >
问题:

Scala重试期货序列,直到它们全部完成

尤茂材
2023-03-14

在 scala 中,您将如何编写一个函数来获取期货序列,运行所有函数,不断重试任何失败的函数,并返回结果?

例如,签名可以是:

  def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]

加分为一个可配置的超时,在这一点上的功能失败,被调用者可以处理这种情况。< br >如果错误案例处理程序可以接收失败的期货列表,则奖励加分。

谢谢!

共有2个答案

双子民
2023-03-14
匿名用户

据我所知,在标准库中没有用于< code>Future超时的实用程序。

您将如何中断/取消JVM上正在进行的计算?在一般情况下,您不能,您只能在线程处于等待状态时中断它,但如果它永远不会等待。用于异步计算(定义取消)的IO库将IO作为一系列较小的不可中断任务执行(每个map/Map平坦创建一个新步骤),如果它们收到取消/超时,它们将继续执行当前任务(因为它们无法停止它),但它们不会开始下一个任务。您可以在超时时返回异常,但仍然会执行最后一步,因此如果它是一些副作用(例如:DB操作)它将在您已经返回失败后完成。

这是不直观和棘手的,我认为这是为什么这种行为没有被添加到标准库的原因。

此外,未来是正在进行的,潜在的副作用操作。您不能接受< code>Future[A]类型的值并重新运行它。但是,您可以通过名称参数传递future,这样在< code >中。recoverWith您可以重新创造未来。

可悲的是,你可以实现类似“重试到LocalDateTime.now开始时间”的东西

def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
  future.recoverWith {
    case error: Throwable =>
      if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
      else retryHelper(future, attemptsLeft - 1, timeoutTime)
  }

它可以与< code>Future.sequence结合来创建结果列表:

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}

如果要跟踪哪个future失败,哪个future成功:

def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
  future.map(a => Right(a))).recover {
    case error: Throwable => Left(error)
  }

def retryFutures[A](list: List[() => Future[A]) = {
  val attempts: Int = ...
  val timeout: Instant = ...
  Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}

如果你不介意取消JVM上的期货,如果你有更多这样的情况,我建议你使用一个库。

如果你想使用实现重试的东西,有cats-retry

如果你想在定义计算方面有比< code>Future更好的东西(例如,不需要你使用按名参数或空函数的东西),试试Monix或ZIO(https://zio.dev/)

淳于禄
2023-03-14

基于Retry返回Future考虑的函数

def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
  Future
    .unit
    .flatMap(_ => expr).map(v => Right(v))
    .recoverWith {
      case _ if n > 1 => retry(expr, n - 1)
      case e => Future.failed(e).recover{case e => Left(e)}
    }
}

结合

Future.sequence

它将列表[Future[T]]转换为未来[List[T]]。然而,序列具有快速故障行为,因此我们必须将未来[T],提升到将来[T].

把这些部分放在一起,我们可以定义

def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
  Future.sequence(futures.map(f => retry(f.apply())))
}

像这样使用它

val futures = List(
  () => Future(42),
  () => Future(throw new RuntimeException("boom 1")),
  () => Future(11),
  () => Future(throw new RuntimeException("boom 2"))
)

waitRetryAll(futures)
  .andThen { case v => println(v) }

哪些输出

Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))

我们可以收集我们的s或s并相应地恢复或继续处理,例如

waitRetryAll(futures)
  .map(_.collect{ case v if v.isLeft => v })
  ...

注意我们如何传入List[()=

 类似资料:
  • 假设我有一个抽象的“生产者”实例: 我需要对它产生的每个(或一些)对象进行一些处理。所以,我做了类似的事情: …并以<code>Future[Seq[Future[T]]]结束。这没关系,但有点麻烦。我想摆脱外部的,只需要就可以了,但我想不出一个(非阻塞)转换,可以让我这样做。 有什么想法吗?

  • 我有两个在未来发生的计算,如下所示: 我希望它能够运行,以便comp2总是在comp1完成后运行!我知道使用一个表达,我可以组成这两个Future的喜欢。 什么能保证comp1在comp2之前完成?我的意思是这些是发生在不同线程中的计算,并且不能保证运行的顺序。有没有一种方法可以保证没有阻塞的顺序?

  • 我想把这个val: 对它进行一些操作(我正在考虑展平) 然后得到这个结果 如果展平方法在这里不合适,那很好。只要我得到结果。 谢啦!

  • 我试图在我正在编写的脚本中测试错误处理。如果异步函数fetchBar失败,我将模式匹配失败案例,然后返回包含失败结果的成功未来。 然而,当我对这个流进行单元测试时,我在测试失败案例时遇到了麻烦。我在fetchBar上打了一个存根,以返回失败的future,如下所示。 但是我注意到fetchedBar返回的是成功而不是失败。为什么会这样,我如何存根fetchBar函数来创建一个失败的尝试?

  • 我想创建一个函数来返回成功操作的一系列期货的结果。我遇到的问题是返回类型为Unit,并且未来函数正在完成,而无需等待嵌套的未来序列完成。我尝试过不使用on完成函数,而是使用map或平面图,但没有成功。我还想避免使用wait 这个后来会这么叫