在 scala 中,您将如何编写一个函数来获取期货序列,运行所有函数,不断重试任何失败的函数,并返回结果?
例如,签名可以是:
def waitRetryAll[T](futures: Seq[Future[T]]): Future[Seq[T]]
加分为一个可配置的超时,在这一点上的功能失败,被调用者可以处理这种情况。< br >如果错误案例处理程序可以接收失败的期货列表,则奖励加分。
谢谢!
据我所知,在标准库中没有用于< code>Future超时的实用程序。
您将如何中断/取消JVM上正在进行的计算?在一般情况下,您不能,您只能在线程处于等待状态时中断它,但如果它永远不会等待。用于异步计算(定义取消)的IO库将IO作为一系列较小的不可中断任务执行(每个map/Map平坦创建一个新步骤),如果它们收到取消/超时,它们将继续执行当前任务(因为它们无法停止它),但它们不会开始下一个任务。您可以在超时时返回异常,但仍然会执行最后一步,因此如果它是一些副作用(例如:DB操作)它将在您已经返回失败后完成。
这是不直观和棘手的,我认为这是为什么这种行为没有被添加到标准库的原因。
此外,未来是正在进行的,潜在的副作用操作。您不能接受< code>Future[A]类型的值并重新运行它。但是,您可以通过名称参数传递future,这样在< code >中。recoverWith您可以重新创造未来。
可悲的是,你可以实现类似“重试到LocalDateTime.now开始时间”的东西
def retry[A](future: => Future[A], attemptsLeft: Int, timeoutTime: Instant) =
future.recoverWith {
case error: Throwable =>
if (attemptsLeft <= 0 || Instant.now.isAfter(timeoutTime)) Future.failure(error)
else retryHelper(future, attemptsLeft - 1, timeoutTime)
}
它可以与< code>Future.sequence结合来创建结果列表:
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(future(), attempts, timeout)))
}
如果要跟踪哪个future失败,哪个future成功:
def futureAttempt[A](future: Future[A]): Future[Either[Throwable, A]] =
future.map(a => Right(a))).recover {
case error: Throwable => Left(error)
}
def retryFutures[A](list: List[() => Future[A]) = {
val attempts: Int = ...
val timeout: Instant = ...
Future.sequence(list.map(future => retry(futureAttempt(future()), attempts, timeout)))
}
如果你不介意取消JVM上的期货,如果你有更多这样的情况,我建议你使用一个库。
如果你想使用实现重试的东西,有cats-retry
如果你想在定义计算方面有比< code>Future更好的东西(例如,不需要你使用按名参数或空函数的东西),试试Monix或ZIO(https://zio.dev/)
基于Retry返回Future考虑的函数
def retry[T](expr: => Future[T], n: Int = 3): Future[Either[Throwable, T]] = {
Future
.unit
.flatMap(_ => expr).map(v => Right(v))
.recoverWith {
case _ if n > 1 => retry(expr, n - 1)
case e => Future.failed(e).recover{case e => Left(e)}
}
}
结合
Future.sequence
它将列表[Future[T]]
转换为未来[List[T]]
。然而,序列
具有快速故障行为,因此我们必须将未来[T]
,提升到将来[T].
把这些部分放在一起,我们可以定义
def waitRetryAll[T](futures: List[() => Future[T]]): Future[List[Either[Throwable, T]]] = {
Future.sequence(futures.map(f => retry(f.apply())))
}
像这样使用它
val futures = List(
() => Future(42),
() => Future(throw new RuntimeException("boom 1")),
() => Future(11),
() => Future(throw new RuntimeException("boom 2"))
)
waitRetryAll(futures)
.andThen { case v => println(v) }
哪些输出
Success(List(Right(42), Left(java.lang.RuntimeException: boom 1), Right(11), Left(java.lang.RuntimeException: boom 2)))
我们可以收集
我们的左
s或右
s并相应地恢复或继续处理,例如
waitRetryAll(futures)
.map(_.collect{ case v if v.isLeft => v })
...
注意我们如何传入List[()=
假设我有一个抽象的“生产者”实例: 我需要对它产生的每个(或一些)对象进行一些处理。所以,我做了类似的事情: …并以<code>Future[Seq[Future[T]]]结束。这没关系,但有点麻烦。我想摆脱外部的,只需要就可以了,但我想不出一个(非阻塞)转换,可以让我这样做。 有什么想法吗?
我有两个在未来发生的计算,如下所示: 我希望它能够运行,以便comp2总是在comp1完成后运行!我知道使用一个表达,我可以组成这两个Future的喜欢。 什么能保证comp1在comp2之前完成?我的意思是这些是发生在不同线程中的计算,并且不能保证运行的顺序。有没有一种方法可以保证没有阻塞的顺序?
我想把这个val: 对它进行一些操作(我正在考虑展平) 然后得到这个结果 如果展平方法在这里不合适,那很好。只要我得到结果。 谢啦!
我试图在我正在编写的脚本中测试错误处理。如果异步函数fetchBar失败,我将模式匹配失败案例,然后返回包含失败结果的成功未来。 然而,当我对这个流进行单元测试时,我在测试失败案例时遇到了麻烦。我在fetchBar上打了一个存根,以返回失败的future,如下所示。 但是我注意到fetchedBar返回的是成功而不是失败。为什么会这样,我如何存根fetchBar函数来创建一个失败的尝试?
我想创建一个函数来返回成功操作的一系列期货的结果。我遇到的问题是返回类型为Unit,并且未来函数正在完成,而无需等待嵌套的未来序列完成。我尝试过不使用on完成函数,而是使用map或平面图,但没有成功。我还想避免使用wait 这个后来会这么叫