当前位置: 首页 > 知识库问答 >
问题:

如何等待几个期货?

奚翰海
2023-03-14

假设我有几个未来,需要等到它们中的任何一个失败或全部成功。

例如:设有3个期货:f1f2f3

>

  • 如果f1成功而f2失败,我不会等待f3(并将失败返回给客户端)。

    如果<code>f2</code>失败,而<code>f1</code>和<code>f3</code>仍在运行,我不会等待它们(并返回故障)

    如果f1成功,然后f2成功,我继续等待f3

    你将如何实现它?

  • 共有3个答案

    柳均
    2023-03-14

    这里有一个不使用演员的解决方案。

    import scala.util._
    import scala.concurrent._
    import java.util.concurrent.atomic.AtomicInteger
    
    // Nondeterministic.
    // If any failure, return it immediately, else return the final success.
    def allSucceed[T](fs: Future[T]*): Future[T] = {
      val remaining = new AtomicInteger(fs.length)
    
      val p = promise[T]
    
      fs foreach {
        _ onComplete {
          case s @ Success(_) => {
            if (remaining.decrementAndGet() == 0) {
              // Arbitrarily return the final success
              p tryComplete s
            }
          }
          case f @ Failure(_) => {
            p tryComplete f
          }
        }
      }
    
      p.future
    }
    
    冯鸿光
    2023-03-14

    您可以使用promise,并向其发送第一个失败或最终完成的聚合成功:

    def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
      val p = Promise[M[A]]()
    
      // the first Future to fail completes the promise
      in.foreach(_.onFailure{case i => p.tryFailure(i)})
    
      // if the whole sequence succeeds (i.e. no failures)
      // then the promise is completed with the aggregated success
      Future.sequence(in).foreach(p trySuccess _)
    
      p.future
    }
    

    然后,如果您想阻止,您可以在生成的Future上等待,或者只是将其映射到其他内容中。

    for comprehension的不同之处在于,这里您得到的是第一个失败的错误,而for comprehension得到的是输入集合遍历顺序中的第一个错误(即使另一个错误先失败)。例如:

    val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
    val f2 = Future { 5 }
    val f3 = Future { None.get }
    
    Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
    // this waits one second, then prints "java.lang.ArithmeticException: / by zero"
    // the first to fail in traversal order
    

    和:

    val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
    val f2 = Future { 5 }
    val f3 = Future { None.get }
    
    sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
    // this immediately prints "java.util.NoSuchElementException: None.get"
    // the 'actual' first to fail (usually...)
    // and it returns early (it does not wait 1 sec)
    

    徐兴昌
    2023-03-14

    您可以改用如下 for-comprehensiion:

    val fut1 = Future{...}
    val fut2 = Future{...}
    val fut3 = Future{...}
    
    val aggFut = for{
      f1Result <- fut1
      f2Result <- fut2
      f3Result <- fut3
    } yield (f1Result, f2Result, f3Result)
    

    在这个例子中,期货1、2和3并行启动。然后,在理解中,我们等待结果1,然后是2,然后是3。如果1或2失败,我们将不再等待3。如果所有3个都成功,那么aggFutval将持有一个具有3个插槽的元组,对应于3个期货的结果。

    现在,如果您需要在假设Fut2先失败时停止等待的行为,事情会变得有点棘手。在上面的示例中,您必须等待Fut1完成,然后才能意识到Fut2失败。要解决这个问题,您可以尝试这样的方法:

      val fut1 = Future{Thread.sleep(3000);1}
      val fut2 = Promise.failed(new RuntimeException("boo")).future
      val fut3 = Future{Thread.sleep(1000);3}
    
      def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
        val fut = if (futures.size == 1) futures.head._2
        else Future.firstCompletedOf(futures.values)
    
        fut onComplete{
          case Success(value) if (futures.size == 1)=> 
            prom.success(value :: values)
    
          case Success(value) =>
            processFutures(futures - value, value :: values, prom)
    
          case Failure(ex) => prom.failure(ex)
        }
        prom.future
      }
    
      val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
      aggFut onComplete{
        case value => println(value)
      }
    

    现在,这是正确的,但问题在于知道在成功完成映射后,要从<code>映射中删除哪个<code>未来。只要你有办法将一个结果与产生该结果的未来正确地关联起来,那么类似的事情就行了。它只是递归地从映射中删除已完成的期货,然后调用Future。在剩余的期货上首先完成,直到没有剩余,一路上收集结果。这并不漂亮,但如果你真的需要你正在谈论的行为,那么这个或类似的东西可以工作。

     类似资料:
    • 有没有更好的写法呢?由于某种原因,我真的需要这个循环吗?(似乎与有关)

    • 问题内容: 我正在研究Java Selenium-WebDriver。我加了 和 因为我的应用程序需要几秒钟来加载用户界面。所以我设置了2秒的隐式等待。但是我找不到元素文本框 然后我添加 现在工作正常。哪一个是更好的方法? 问题答案: 好吧,有两种类型的等待:显式和隐式等待。显式等待的想法是 隐式等待的概念是 你可以在此处获得细节上的差异。 在这种情况下,我宁愿使用显式等待(尤其是): 函数返回找

    • 我在做一个JavaSelenium-WebDriver 和 因为我的应用程序只需几秒钟就能加载用户界面。所以我设定了2秒的等待时间。但我找不到元素文本框 然后我添加

    • 我正在运行这样的多个服务:(例如,在多个线程中读取文件) 是一个扩展类的类,它正在做一些事情,比如读取文件。它是从另一个线程调用的,该线程不是JavaFX应用程序线程。 我如何等到所有这些服务完成后再调用? 可复制的例子:

    • 问题内容: 如何更改以下代码,以触发两个异步操作并有机会同时运行? 我需要做这样的事情吗? 问题答案: TL; DR 不要在获得承诺的问题中使用模式,而是分别等待它们;而是使用(至少现在): 虽然您的解决方案 确实 并行运行这两个操作,但如果两个诺言都被拒绝,它就无法正确处理拒绝。 细节: 您的解决方案并行运行它们,但始终等待第一个完成,然后再等待第二个。 如果您只想启动它们,并行运行它们,并获得

    • 问题内容: 我如何更改以下代码,以触发两个异步操作并有机会同时运行? 我需要做这样的事情吗? 问题答案: TL; DR 不要在获得承诺的问题中使用模式,而是分别等待它们;而是使用(至少现在): 虽然您的解决方案确实并行运行这两个操作,但是如果两个诺言都被拒绝,它就无法正确处理拒绝。 细节: 您的解决方案并行运行它们,但始终等待第一个完成,然后再等待第二个。如果您只想启动它们,并行运行它们,并获得两