我使用scala futures异步提交了1000份工作。我还实现了一个由并发阻塞队列支持的ThrottledExecutionContext,这样它一次最多只能运行100个作业,并将其余的放入队列中。这是一个阻塞操作,因为它涉及调用第三方服务本身。当其中一个抛出异常时,我需要重试整个操作(1000个作业)或者跳过整个批处理。当某些期货仍在运行时,我不能重试。我有办法知道在任何给定的时间点有多少作业在第三方系统中运行(< code>spark)。因此,一旦我捕捉到一个异常,我想首先杀死所有剩余的期货,清空队列,等待第三方完成该批处理的任何挂起的作业,然后重试。那么,有没有一种方法可以在一个例外中扼杀所有的未来?
我根据下面的讨论尝试了failFast,但它没有达到我预期的效果。我对< code>Promise还没有更好的理解。但似乎我们可以用< code >promise来控制< code >未来的未来!
Scala未来/promise快速失败管道
var atomicnt = new AtomicInteger() // to track how many jobs were finished when exception occured
def failFast[T](futures: Seq[Future[T]]): Future[Seq[T]] = {
val promise = Promise[Seq[T]]
futures.foreach{f => f.onFailure{case ex => promise.failure(ex)}}
val res = Future.sequence(futures)
promise.completeWith(res).future
}
def normalTask() = {
println("Starting normaltask")
Thread.sleep(2000 + Random.nextInt(5000))
if(Random.nextDouble() > 0.5) {
println("Throwing random exception..")
throw new RuntimeException("Random exception from normalTask")
}
atomicnt.getAndIncrement
Thread.sleep(2000 + Random.nextInt(5000))
println("Finished normaltask")
}
def testException() = {
val rg = (0 until 500)
val futures = rg.map(i =>{
Future(normalTask)
})
val res = failFast(futures)
Await.result(res, Duration.Inf) //blocking here to wait for all 500 to finish
}
def batchProcessing() {
try {
println("Starting batchProcessing")
testException()
println("Exiting batchProcessing")
} catch {
case t: Throwable => {
println("Error in main")
Thread.sleep(10000) //Here while waiting other futures are still running
t.printStackTrace()
// retry logic goes here based on failure or entire batch will be skipped
}
}
}
然而,当我在批处理中捕捉到异常时,其他期货仍在运行。
我尝试并行处理的另一种选择是使用并行集合,这似乎有效。即。如果任何任务失败,则整个并行操作将失败。但是,问题在于吞吐量,它受我可用的CPU数量的限制。由于所有任务都长时间运行并且阻止并行收集感觉不是正确的方法。
内置scala<code>Future</code>一旦启动就不能中断。
似乎您需要一些类似于monix任务或ZIO的东西,可以很容易地中断和重试。
我有一个可完成期货的列表,我想从第一个期货开始,如果有任何完成例外,我想尝试列表中的下一个期货,依此类推,直到我耗尽了我所有的期货。如果任何一个期货成功了,我想就此止步,而不使用列表中的下一个期货。我如何做到这一点?到目前为止,我已经尝试过: 但是当我测试这种方法时,我看到当未来完成失败时,会抛出异常,并且不会尝试下一组期货。 编辑: 这就是样本的样子
Java 8的< code > CompletableFuture . allof(CompletableFuture 如果我的一个期货异常完成,那么< code > completablefuture . allof 会在抛出< code>CompletionException之前等待其余期货完成,还是会取消其余期货? 如果它等待所有期货完成,有没有办法让它在任何期货抛出异常并取消剩余期货时立即
在这种情况下,我的做法是使用 将 转换为 。然后使用 获取结果。但是,可能有一个任务需要很长时间并且超时。在这种情况下,我仍然希望获得其余结果(同时并行运行所有任务)。可能吗?怎么办? 谢谢
假设我有一个抽象的“生产者”实例: 我需要对它产生的每个(或一些)对象进行一些处理。所以,我做了类似的事情: …并以<code>Future[Seq[Future[T]]]结束。这没关系,但有点麻烦。我想摆脱外部的,只需要就可以了,但我想不出一个(非阻塞)转换,可以让我这样做。 有什么想法吗?
我正在阅读Scala 2.11.8留档的函数在scala.concurrent.Future模块,它说: 将副作用函数应用于这个未来的结果,并返回一个包含这个未来的结果的新的未来。 这个方法允许强制回调以指定的顺序执行。 请注意,如果其中一个链式第四个回调引发异常,则该异常不会传播到后续的第四个调用。相反,随后的第四次回调将被赋予此未来的原始值。 我不确定< code >和不传播异常到底是什么意思
我在从JSON发送日期字段时收到一个错误。 Pojo类: 错误: JSON分析错误:无法从字符串“2018-07-10”反序列化类型的值:格式应为“yyyy-mm-dd hh:mm:ss.000”;嵌套异常为com.fasterxml.jackson.databind.exc.InvalidFormatException:无法从字符串“2018-07-10”反序列化类型的值:格式应为“yyyy-m