我有一系列相同类型的scala期货。
我想在一段有限的时间后,得到整个序列的结果。虽然有些期货可能已经成功,有些可能已经失败,有些还没有完成,但未完成的期货应该被视为失败。
我不想使用“依次等待每个未来”。
我确实看了这个问题:Scala等待未来序列,并尝试从那里使用解决方案,即:
private def lift[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
futures.map(_.map { Success(_) }.recover { case t => Failure(t) })
def waitAll[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
Future.sequence(lift(futures))
futures: Seq[Future[MyObject]] = ...
val segments = Await.result(waitAll(futures), waitTimeoutMillis millis)
但是我仍然有一个超时异常,我想是因为一些未来还没有完成。答案还指出,
现在,Future.sequence(lifted)将在每个Future完成时完成,并将使用Try表示成功和失败。
但是我希望我的Future在超时之后完成,而不是在序列中的每个Future都完成时。我还能做什么?
这是使用monix的解决方案
import monix.eval.Task
import monix.execution.Scheduler
val timeoutScheduler = Scheduler.singleThread("timeout") //it's safe to use single thread here because timeout tasks are very fast
def sequenceDiscardTimeouts[T](tasks: Task[T]*): Task[Seq[T]] = {
Task
.parSequence(
tasks
.map(t =>
t.map(Success.apply) // Map to success so we can collect the value
.timeout(500.millis)
.executeOn(timeoutScheduler) //This is needed to run timesouts in dedicated scheduler that won't be blocked by "blocking"/io work if you have any
.onErrorRecoverWith { ex =>
println("timed-out")
Task.pure(Failure(ex)) //It's assumed that any error is a timeout. It's possible to "catch" just timeout exception here
}
)
)
.map { res =>
res.collect { case Success(r) => r }
}
}
测试代码
implicit val mainScheduler = Scheduler.fixedPool(name = "main", poolSize = 10)
def slowTask(msg: String) = {
Task.sleep(Random.nextLong(1000).millis) //Sleep here to emulate a slow task
.map { _ =>
msg
}
}
val app = sequenceDiscardTimeouts(
slowTask("1"),
slowTask("2"),
slowTask("3"),
slowTask("4"),
slowTask("5"),
slowTask("6")
)
val started: Long = System.currentTimeMillis()
app.runSyncUnsafe().foreach(println)
println(s"Done in ${System.currentTimeMillis() - started} millis")
这将为每次运行打印不同的输出,但它应该如下所示
timed-out
timed-out
timed-out
3
4
5
Done in 564 millis
请注意使用了两个独立的调度程序。这是为了确保即使< code>main调度程序忙于业务逻辑,也会触发超时。您可以通过减少主调度程序的< code>poolSize来测试它。
如果我使用raw Future
(而不是内置了这种功能的IO monad,或者没有专门用于这种功能的Akka实用程序),我会编写如下实用程序:
// make each separate future timeout
object FutureTimeout {
// separate EC for waiting
private val timeoutEC: ExecutorContext = ...
private def timeout[T](delay: Long): Future[T] = Future {
blocking {
Thread.sleep(delay)
}
throw new Exception("Timeout")
}(timeoutEC)
def apply[T](fut: Future[T], delat: Long)(
implicit ec: ExecutionContext
): Future[T] = Future.firstCompletedOf(Seq(
fut,
timeout(delay)
))
}
然后
Future.sequence(
futures
.map(FutureTimeout(_, delay))
.map(Success(_))
.recover { case e => Failure(e) }
)
由于每个未来至多在< code>delay之后终止,我们将能够在那之后立即将它们收集到一个结果中。
但你必须记住,无论你如何触发超时,你都不能保证超时的未来
会停止执行。它可以在某个地方的某个线程上运行,只是你不会等待结果。firstCompletedOf
只是让这个种族更加明确。
其他一些实用程序(例如Cats Effect IO)允许您取消计算(这在像这样的比赛中使用),但是您仍然必须记住JVM不能任意“杀死”一个正在运行的线程,所以取消会发生在一个计算阶段完成之后,下一个阶段开始之前(例如在< code >之间)。映射或< code >。平面图。
如果你不害怕添加外部deps,还有其他(更可靠的,因为Thread.sleep
只是一个暂时的丑陋的黑客)方法来计时未来,比如Akka utils。另请参阅类似这样的其他问题。
现在我想让所有的期货最多等待n秒,直到全部完成。我知道我可以调用,但是如果我在循环中对我的所有期货顺序地调用它,则超时开始增加。伪代码: 块会出现超时,直到结果就绪。因此,如果第一个在超时之前完成,第二个也在超时之前完成,依此类推,则整个执行时间最多为而不是。 因此,我正在寻找一个方法,它接受的列表和一个超时,并行运行所有的结果,然后返回一个未来结果的集合。有什么想法吗?
假设我有几个未来,需要等到它们中的任何一个失败或全部成功。 例如:设有3个期货:、、。 > 如果成功而失败,我不会等待(并将失败返回给客户端)。 如果<code>f2</code>失败,而<code>f1</code>和<code>f3</code>仍在运行,我不会等待它们(并返回故障) 如果成功,然后成功,我继续等待。 你将如何实现它?
我在Scala中有一个要求,即运行一系列http调用,这些调用必须按顺序完成且不阻塞。我怎样才能做到这一点?
我对Selenium和Python是新手。我可以浏览网站,找到元素并打印出来,但速度很慢 Python版本:3.10;Selenium WebDrive:Firefox;IDE:PyCharm 2021.3.2(CE);操作系统:Fedora 35 VM 我试图打印的元素的HTML代码: 我的Python Selenium找到了大部分时间都能正常工作的代码(但当服务器需要很长时间才能响应时失败):
Html树: Xpath://table[@class='ur MatrixLayout urhtmltableReset']//tr//table//tr//td//div//div/span[contains(text(),'revisations')]
我启动了几个异步进程,如果需要,这些进程反过来可以启动更多的进程(想想遍历目录结构或类似的东西)。每个进程都会返回一些东西,最后我想等待所有这些进程的完成,并安排一个函数来处理结果集合。 我的解决方案尝试使用可变的(我不断添加我生成的期货)和来安排一些函数在此缓冲区中列出的所有这些期货完成后运行。 我准备了一个简单的例子来说明这个问题: 它首先调度和期货,然后将在1秒后的分辨率中调度。本身将在2秒