当前位置: 首页 > 知识库问答 >
问题:

并行运行多个期货,超时时返回默认值

翟曦
2023-03-14

我必须并行运行多个期货,程序不应该崩溃或挂起。

目前,我一个接一个地等待期货,如果存在TimeoutException,则使用回退值。

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

// <- at this point all 3 futures are running

// waits for maximum of timeout1 seconds
val res1 = toFallback(future1, timeout1, Map[String, Int]())
// .. timeout2 seconds 
val res2 = toFallback(future2, timeout2, List[Int]())
// ... timeout3 seconds
val res3 = toFallback(future3, timeout3, Map[String, BigInt]()) 

def toFallback[T](f: Future[T], to: Int, default: T) = {
  Try(Await.result(f, to seconds))
    .recover { case to: TimeoutException => default }
}

如我所见,这个代码片段的最大等待时间是< code > time out 1 time out 2 time out 3

我的问题是:我怎么能同时等待所有这些期货,这样我就可以将等待时间减少到最大值(timeout1,timeout2,timeout3)

编辑:最后我用了@Jatin和@senia的修改答案:

private def composeWaitingFuture[T](fut: Future[T], 
                                    timeout: Int, default: T) =
  future { Await.result(fut, timeout seconds) } recover {
    case e: Exception => default
  }

后来它使用如下:

// starts futures immediately and waits for maximum of timeoutX seconds
val res1 = composeWaitingFuture(future1, timeout1, Map[String, Int]())
val res2 = composeWaitingFuture(future2, timeout2, List[Int]())
val res3 = composeWaitingFuture(future3, timeout3, Map[String, BigInt]()) 

// takes the maximum of max(timeout1, timeout2, timeout3) to complete
val combinedFuture =
  for {
    r1 <- res1
    r2 <- res2
    r3 <- res3
  } yield (r1, r2, r3)

后来我使用了我认为合适的cominedFuture

共有3个答案

公羊宇定
2023-03-14

我会避免使用<code>等待。结果,因为它只使用线程进行阻塞。实现期货超时的一个选项是:

val timer = new Timer()

def toFallback[T](f: Future[T], timeout: Int, default: T) = {
  val p = Promise[T]()
  f.onComplete(result => p.tryComplete(result))
  timer.schedule(new TimerTask {
    def run() {
      p.tryComplete(Success(default))
    }
  }, timeout)
  p.future
}

这就产生了一个promise,在指定的超时之后,这个promise将由未来或默认结果来完成——无论哪一个先发生。

要同时运行查询,请执行以下操作:

val future1 = // start future1
val future2 = // start future2
val future3 = // start future3

val res1 = toFallback(future1, timeout1, Map[String, Int]())
val res2 = toFallback(future2, timeout2, List[Int]())
val res3 = toFallback(future3, timeout3, Map[String, BigInt]())

val resultF = for {
  r1 <- res1
  r2 <- res2
  r3 <- res3
} yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(resultF, Duration.Inf)
println(s"$r1, $r2, $r3")

//or
resultF.onSuccess {
  case (r1, r2, r3) => println(s"$r1, $r2, $r3")
}
郦兴德
2023-03-14
def toFallback[T](f: Future[T], to: Int, default: T) = {
  future{
  try{
        Await.result(f, to seconds)
   }catch{
        case e:TimeoutException => default
  }
 }

您甚至可以使此块异步并且每个请求等待其最大时间。如果线程太多,可能有一个线程使用Akka的系统调度程序不断检查其他期货。@Senia在下面回答了这个问题。

赫连照
2023-03-14

您可以创建< code>future,使用< code>flatMap或for-comprehensive返回所有3个未来的结果:

val combinedFuture =
  for {
    r1 <- future1
    r2 <- future2
    r3 <- future3
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , Seq(timeout1, timeout2, timeout3).max)

如果您使用的是 akka,则可以在超时后使用默认值完成未来:

implicit class FutureHelper[T](f: Future[T]) extends AnyVal{
  import akka.pattern.after
  def orDefault(t: Timeout, default: => T)(implicit system: ActorSystem): Future[T] = {
    val delayed = after(t.duration, system.scheduler)(Future.successful(default))
    Future firstCompletedOf Seq(f, delayed)
  }
}

val combinedFuture =
  for {
    r1 <- future1.orDefault(timeout1, Map())
    r2 <- future2.orDefault(timeout2, List())
    r3 <- future3.orDefault(timeout3, Map())
  } yield (r1, r2, r3)

val (r1, r2, r3) = Await.result(combinedFuture , allowance + Seq(timeout1, timeout2, timeout3).max)
 类似资料:
  • 问题内容: 我似乎有时会遇到一些tcp请求被“卡住”的麻烦,就像它正在等待一些响应,但连接已被“切断”,因此响应永远不会到来。这是具有默认超时的HttpURLConnection的预期行为吗?是否设置了明智的默认设置,以便默认情况下我无法进入这种奇怪的“挂起”情况? 问题答案: 出现HttpURLConnection的“默认”超时为零,表示“无超时”。 不幸的是,根据我的经验,根据您与服务器的连接

  • 问题内容: Python 3.4。尝试在urllib.request.urlopen()中查找默认超时是多少。 它的签名是:urllib.request.urlopen(URL,data = None,[timeout,] *,cafile = None,capath = None,cadefault = False,context = None) 该文档称其为“全局默认超时”,并查看其代码:so

  • 问题内容: 在我的脚本中,永远不会返回: 可能是什么原因?有补救办法吗?使用的默认超时是多少? 问题答案: 获取使用的默认超时是多少? 默认超时为,这意味着它将等待(挂起)直到连接关闭。 当您传递超时值时会发生什么?

  • 在JSF2应用程序中,当文件中没有明确提到会话超时时,会话超时是什么? 更新:我正在使用Tomcat,请参阅此处有关Tomcat中默认超时的相关帖子。

  • 问题内容: 在这里,我需要同时执行,并在同一时间。 当我尝试在其上放置一个并行块时,由于在官方站点中这样提到,因此它引发了错误。 } 问题答案: 您不必将每个调用都放在阶段内的并行作业中,因此可以这样进行:

  • 问题内容: 默认情况下,PHP会话是否超时-即,如果我没有任何编码,最终在一段时间不活动后最终将“注销”用户吗? 问题答案: 这取决于服务器配置或相关指令的session.gc_maxlifetime在。 通常情况下,默认值为24分钟(1440秒),但是您的虚拟主机可能已将默认值更改为其他值。