当前位置: 首页 > 知识库问答 >
问题:

期货列表-当异常发生时杀死所有期货

楚瑞
2023-03-14

我使用scala futures异步提交了1000份工作。我还实现了一个由并发阻塞队列支持的ThrottledExecutionContext,这样它一次最多只能运行100个作业,并将其余的放入队列中。这是一个阻塞操作,因为它涉及调用第三方服务本身。当其中一个抛出异常时,我需要重试整个操作(1000个作业)或者跳过整个批处理。当某些期货仍在运行时,我不能重试。我有办法知道在任何给定的时间点有多少作业在第三方系统中运行(< code>spark)。因此,一旦我捕捉到一个异常,我想首先杀死所有剩余的期货,清空队列,等待第三方完成该批处理的任何挂起的作业,然后重试。那么,有没有一种方法可以在一个例外中扼杀所有的未来?

我根据下面的讨论尝试了failFast,但它没有达到我预期的效果。我对< code>Promise还没有更好的理解。但似乎我们可以用< code >promise来控制< code >未来的未来!

Scala未来/promise快速失败管道

  var atomicnt = new AtomicInteger() // to track how many jobs were finished when exception occured

  def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
    val promise = Promise[Seq[T]]
    futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
    val res = Future.sequence(futures)
    promise.completeWith(res).future
  }

  def normalTask() =  {
    println("Starting normaltask")
    Thread.sleep(2000 + Random.nextInt(5000))
    if(Random.nextDouble() > 0.5) {
      println("Throwing random exception..")
      throw new RuntimeException("Random exception from normalTask")
    }
    atomicnt.getAndIncrement
    Thread.sleep(2000 + Random.nextInt(5000))
    println("Finished normaltask")
  }

  def testException() = {
    val rg = (0 until 500)
    val futures = rg.map(i =>{
      Future(normalTask)
    })
    val res = failFast(futures)
    Await.result(res, Duration.Inf) //blocking here to wait for all 500 to finish    
  }

  def batchProcessing() {
    
    try {
      println("Starting batchProcessing")
      testException()
      println("Exiting batchProcessing")      
    } catch {
      case t: Throwable => {
        println("Error in main")
        Thread.sleep(10000) //Here while waiting other futures are still running
        t.printStackTrace()
        // retry logic goes here based on failure or entire batch will be skipped 
      }
    }
    
  }

然而,当我在批处理中捕捉到异常时,其他期货仍在运行。

我尝试并行处理的另一种选择是使用并行集合,这似乎有效。即。如果任何任务失败,则整个并行操作将失败。但是,问题在于吞吐量,它受我可用的CPU数量的限制。由于所有任务都长时间运行并且阻止并行收集感觉不是正确的方法。

共有1个答案

蒙经纶
2023-03-14

内置scala<code>Future</code>一旦启动就不能中断。

似乎您需要一些类似于monix任务或ZIO的东西,可以很容易地中断和重试。

 类似资料:
  • 我有一个可完成期货的列表,我想从第一个期货开始,如果有任何完成例外,我想尝试列表中的下一个期货,依此类推,直到我耗尽了我所有的期货。如果任何一个期货成功了,我想就此止步,而不使用列表中的下一个期货。我如何做到这一点?到目前为止,我已经尝试过: 但是当我测试这种方法时,我看到当未来完成失败时,会抛出异常,并且不会尝试下一组期货。 编辑: 这就是样本的样子

  • Java 8的< code > CompletableFuture . allof(CompletableFuture 如果我的一个期货异常完成,那么< code > completablefuture . allof 会在抛出< code>CompletionException之前等待其余期货完成,还是会取消其余期货? 如果它等待所有期货完成,有没有办法让它在任何期货抛出异常并取消剩余期货时立即

  • 在这种情况下,我的做法是使用 将 转换为 。然后使用 获取结果。但是,可能有一个任务需要很长时间并且超时。在这种情况下,我仍然希望获得其余结果(同时并行运行所有任务)。可能吗?怎么办? 谢谢

  • 假设我有一个抽象的“生产者”实例: 我需要对它产生的每个(或一些)对象进行一些处理。所以,我做了类似的事情: …并以<code>Future[Seq[Future[T]]]结束。这没关系,但有点麻烦。我想摆脱外部的,只需要就可以了,但我想不出一个(非阻塞)转换,可以让我这样做。 有什么想法吗?

  • 我正在阅读Scala 2.11.8留档的函数在scala.concurrent.Future模块,它说: 将副作用函数应用于这个未来的结果,并返回一个包含这个未来的结果的新的未来。 这个方法允许强制回调以指定的顺序执行。 请注意,如果其中一个链式第四个回调引发异常,则该异常不会传播到后续的第四个调用。相反,随后的第四次回调将被赋予此未来的原始值。 我不确定< code >和不传播异常到底是什么意思

  • 我在从JSON发送日期字段时收到一个错误。 Pojo类: 错误: JSON分析错误:无法从字符串“2018-07-10”反序列化类型的值:格式应为“yyyy-mm-dd hh:mm:ss.000”;嵌套异常为com.fasterxml.jackson.databind.exc.InvalidFormatException:无法从字符串“2018-07-10”反序列化类型的值:格式应为“yyyy-m