当前位置: 首页 > 知识库问答 >
问题:

等待一个超时的期货序列,而不会在TimeoutException上失败

常英毅
2023-03-14

我有一系列相同类型的scala期货。

我想在一段有限的时间后,得到整个序列的结果。虽然有些期货可能已经成功,有些可能已经失败,有些还没有完成,但未完成的期货应该被视为失败。

我不想使用“依次等待每个未来”。

我确实看了这个问题:Scala等待未来序列,并尝试从那里使用解决方案,即:

  private def lift[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    Future.sequence(lift(futures))

  futures: Seq[Future[MyObject]] = ...
  val segments = Await.result(waitAll(futures), waitTimeoutMillis millis)

但是我仍然有一个超时异常,我想是因为一些未来还没有完成。答案还指出,

现在,Future.sequence(lifted)将在每个Future完成时完成,并将使用Try表示成功和失败。

但是我希望我的Future在超时之后完成,而不是在序列中的每个Future都完成时。我还能做什么?

共有2个答案

姜弘化
2023-03-14

这是使用monix的解决方案

import monix.eval.Task
import monix.execution.Scheduler

val timeoutScheduler = Scheduler.singleThread("timeout") //it's safe to use single thread here because timeout tasks are very fast

def sequenceDiscardTimeouts[T](tasks: Task[T]*): Task[Seq[T]] = {
  Task
    .parSequence(
      tasks
        .map(t =>
          t.map(Success.apply) // Map to success so we can collect the value
            .timeout(500.millis)
            .executeOn(timeoutScheduler) //This is needed to run timesouts in dedicated scheduler that won't be blocked by "blocking"/io work if you have any
            .onErrorRecoverWith { ex =>
              println("timed-out")
              Task.pure(Failure(ex)) //It's assumed that any error is a timeout. It's possible to "catch" just timeout exception here
            }
        )
    )
    .map { res =>
      res.collect { case Success(r) => r }
    }
}

测试代码

implicit val mainScheduler = Scheduler.fixedPool(name = "main", poolSize = 10)


def slowTask(msg: String) = {
  Task.sleep(Random.nextLong(1000).millis) //Sleep here to emulate a slow task
    .map { _ =>
      msg
    }
}


val app = sequenceDiscardTimeouts(
  slowTask("1"),
  slowTask("2"),
  slowTask("3"),
  slowTask("4"),
  slowTask("5"),
  slowTask("6")
)

val started: Long = System.currentTimeMillis()
app.runSyncUnsafe().foreach(println)
println(s"Done in ${System.currentTimeMillis() - started} millis")

这将为每次运行打印不同的输出,但它应该如下所示

timed-out
timed-out
timed-out
3
4
5
Done in 564 millis

请注意使用了两个独立的调度程序。这是为了确保即使< code>main调度程序忙于业务逻辑,也会触发超时。您可以通过减少主调度程序的< code>poolSize来测试它。

戈华茂
2023-03-14

如果我使用raw Future(而不是内置了这种功能的IO monad,或者没有专门用于这种功能的Akka实用程序),我会编写如下实用程序:

// make each separate future timeout
object FutureTimeout {
  // separate EC for waiting
  private val timeoutEC: ExecutorContext = ...

  private def timeout[T](delay: Long): Future[T] = Future {
    blocking {
      Thread.sleep(delay)
    }
    throw new Exception("Timeout")
  }(timeoutEC)

  def apply[T](fut: Future[T], delat: Long)(
    implicit ec: ExecutionContext
  ): Future[T] = Future.firstCompletedOf(Seq(
    fut,
    timeout(delay)
  ))
}

然后

Future.sequence(
  futures
    .map(FutureTimeout(_, delay))
    .map(Success(_))
    .recover { case e => Failure(e) }
)

由于每个未来至多在< code>delay之后终止,我们将能够在那之后立即将它们收集到一个结果中。

但你必须记住,无论你如何触发超时,你都不能保证超时的未来会停止执行。它可以在某个地方的某个线程上运行,只是你不会等待结果。firstCompletedOf只是让这个种族更加明确。

其他一些实用程序(例如Cats Effect IO)允许您取消计算(这在像这样的比赛中使用),但是您仍然必须记住JVM不能任意“杀死”一个正在运行的线程,所以取消会发生在一个计算阶段完成之后,下一个阶段开始之前(例如在< code >之间)。映射或< code >。平面图。

如果你不害怕添加外部deps,还有其他(更可靠的,因为Thread.sleep只是一个暂时的丑陋的黑客)方法来计时未来,比如Akka utils。另请参阅类似这样的其他问题。

 类似资料:
  • 现在我想让所有的期货最多等待n秒,直到全部完成。我知道我可以调用,但是如果我在循环中对我的所有期货顺序地调用它,则超时开始增加。伪代码: 块会出现超时,直到结果就绪。因此,如果第一个在超时之前完成,第二个也在超时之前完成,依此类推,则整个执行时间最多为而不是。 因此,我正在寻找一个方法,它接受的列表和一个超时,并行运行所有的结果,然后返回一个未来结果的集合。有什么想法吗?

  • 假设我有几个未来,需要等到它们中的任何一个失败或全部成功。 例如:设有3个期货:、、。 > 如果成功而失败,我不会等待(并将失败返回给客户端)。 如果<code>f2</code>失败,而<code>f1</code>和<code>f3</code>仍在运行,我不会等待它们(并返回故障) 如果成功,然后成功,我继续等待。 你将如何实现它?

  • 我在Scala中有一个要求,即运行一系列http调用,这些调用必须按顺序完成且不阻塞。我怎样才能做到这一点?

  • 我对Selenium和Python是新手。我可以浏览网站,找到元素并打印出来,但速度很慢 Python版本:3.10;Selenium WebDrive:Firefox;IDE:PyCharm 2021.3.2(CE);操作系统:Fedora 35 VM 我试图打印的元素的HTML代码: 我的Python Selenium找到了大部分时间都能正常工作的代码(但当服务器需要很长时间才能响应时失败):

  • Html树: Xpath://table[@class='ur MatrixLayout urhtmltableReset']//tr//table//tr//td//div//div/span[contains(text(),'revisations')]

  • 我启动了几个异步进程,如果需要,这些进程反过来可以启动更多的进程(想想遍历目录结构或类似的东西)。每个进程都会返回一些东西,最后我想等待所有这些进程的完成,并安排一个函数来处理结果集合。 我的解决方案尝试使用可变的(我不断添加我生成的期货)和来安排一些函数在此缓冲区中列出的所有这些期货完成后运行。 我准备了一个简单的例子来说明这个问题: 它首先调度和期货,然后将在1秒后的分辨率中调度。本身将在2秒