当前位置: 首页 > 知识库问答 >
问题:

Scala:加入/等待不断增长的期货队列

郗唯
2023-03-14

我启动了几个异步进程,如果需要,这些进程反过来可以启动更多的进程(想想遍历目录结构或类似的东西)。每个进程都会返回一些东西,最后我想等待所有这些进程的完成,并安排一个函数来处理结果集合。

我的解决方案尝试使用可变的ListBuffer(我不断添加我生成的期货)和Future.sequence来安排一些函数在此缓冲区中列出的所有这些期货完成后运行。

我准备了一个简单的例子来说明这个问题:

object FuturesTest extends App {
  var queue = ListBuffer[Future[Int]]()

  val f1 = Future {
    Thread.sleep(1000)
    val f3 = Future {
      Thread.sleep(2000)
      Console.println(s"f3: 1+2=3 sec; queue = $queue")
      3
    }
    queue += f3
    Console.println(s"f1: 1 sec; queue = $queue")
    1
  }
  val f2 = Future {
    Thread.sleep(2000)
    Console.println(s"f2: 2 sec; queue = $queue")
    2
  }

  queue += f1
  queue += f2
  Console.println(s"starting; queue = $queue")

  Future.sequence(queue).foreach(
    (all) => Console.println(s"Future.sequence finished with $all")
  )

  Thread.sleep(5000) // simulates app being alive later
}

它首先调度f1f2期货,然后f3将在1秒后的f1分辨率中调度。f3本身将在2秒内解析。因此,我期望得到的是以下内容:

starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2, 3)

但是,我实际上得到了:

starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2)
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))

...这很可能是因为我们等待的未来列表在最初调用< code>Future.sequence时是固定的,以后不会改变。

最终,我用这段代码让它按照我想要的方式运行:

  waitForSequence(queue, (all: ListBuffer[Int]) => Console.println(s"finished with $all"))

  def waitForSequence[T](queue: ListBuffer[Future[T]], act: (ListBuffer[T] => Unit)): Unit = {
    val seq = Future.sequence(queue)
    seq.onComplete {
      case Success(res) =>
        if (res.size < queue.size) {
          Console.println("... still waiting for tasks")
          waitForSequence(queue, act)
        } else {
          act(res)
        }
      case Failure(exc) =>
        throw exc
    }
  }

这是按计划进行的,最终实现了所有3种未来:

starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
... still waiting for tasks
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
finished with ListBuffer(1, 2, 3)

但还是很丑。如果它在完成时看到队列比结果数长,它就重新开始< code>Future.sequence等待,希望下次完成时情况会好一些。当然,这是不好的,因为它耗尽了堆栈,并且如果在创建future和将其添加到队列之间的一个小窗口内触发该检查,则可能容易出错。

是否可以在不用Akka重写所有内容或诉诸于使用Reit.result的情况下执行此操作(由于我的代码是为Scala编译的,因此我实际上无法使用.js)。

共有3个答案

端木野
2023-03-14

我以后不会涉及。序列:它将操作并行化,您似乎在寻找顺序异步执行。此外,您可能不需要在定义后立即启动期货。构图应该是这样的:

def run[T](queue: List[() => Future[T]]): Future[List[T]] = {
  (Future.successful(List.empty[T]) /: queue)(case (f1, f2) =>
  f1() flatMap (h => )
  )

val t0 = now

def f(n: Int): () => Future[String] = () => {
  println(s"starting $n")
  Future[String] {
    Thread.sleep(100*n)
    s"<<$n/${now - t0}>>"
  }
}

println(Await.result(run(f(7)::f(10)::f(20)::f(3)::Nil), 20 seconds))

诀窍不是过早地推出期货;这就是为什么我们有f(n),直到我们用()调用它才会开始。

郎曜文
2023-03-14

正确的方法可能是构建你的未来。具体地说,f1不应该只是从f3开始,它可能应该是f3的平面图——也就是说,在f3解决之前,f1的未来不会解决。

请记住,Future.sequence是一种后备选项,仅在Futures完全断开连接时使用。在您描述的情况下,存在真正的依赖关系,这些在您实际返回的Futures中得到了最好的表示。使用Futures时,平面图是您的朋友,应该是您首先使用的工具之一。(通常但不总是作为用于理解。)

可以肯定地说,如果你想要一个可变的未来队列,代码的结构并不正确,有一个更好的方法可以做到这一点。特别是在Scala.js中(我的大部分代码都在那里,并且非常依赖于未来),我经常使用对未来的理解——我认为这是唯一明智的操作方式...

戴瑞
2023-03-14

就像Justin提到的,你不能失去对其他期货内部生成的期货的引用,你应该使用map和flatMap来链接它们。

val f1 = Future {
  Thread.sleep(1000)
  val f3 = Future {
    Thread.sleep(2000)
    Console.println(s"f3: 1+2=3 sec")
    3
  }
  f3.map{
    r =>
      Console.println(s"f1: 1 sec;")
      Seq(1, r)
  }
}.flatMap(identity)

val f2 = Future {
  Thread.sleep(2000)
  Console.println(s"f2: 2 sec;")
  Seq(2)
}

val futures = Seq(f1, f2)

Future.sequence(futures).foreach(
  (all) => Console.println(s"Future.sequence finished with ${all.flatten}")
)

Thread.sleep(5000) // simulates app being alive later

这适用于最小的示例,我不确定它是否适用于您的实际用例。结果是:

f2: 2 sec;
f3: 1+2=3 sec
f1: 1 sec;
Future.sequence finished with List(1, 3, 2)
 类似资料:
  • 我必须从带有Scala的RESTAPI中获取给定列表中每个文件的问题列表。我想并行处理这些请求,并使用调度库来实现。我的方法是从Java框架调用的,我必须在该方法结束时等待所有未来的结果,以将整体结果返回到框架。这是我的密码: 这段代码有几个问题。首先,我没有得到我期望的< code>issuesByFile (1)的类型。如果< code>findLookUpId无法找到查找Id(即< code

  • 假设我有几个未来,需要等到它们中的任何一个失败或全部成功。 例如:设有3个期货:、、。 > 如果成功而失败,我不会等待(并将失败返回给客户端)。 如果<code>f2</code>失败,而<code>f1</code>和<code>f3</code>仍在运行,我不会等待它们(并返回故障) 如果成功,然后成功,我继续等待。 你将如何实现它?

  • 我有一个未来的列表,在每个未来完成后,我有一个应该执行的回调。 我在使用期货。successfulAsList检查是否所有期货都已完成。然而,这并没有考虑回调的完成。 有没有办法确保回调完成? 我可以用期货代替回调。转换为包装到另一个未来,并检查其是否完成。然而,这样一来,我就无法访问包装好的将来引发的运行时异常。

  • 等待队列 到目前为止,我们的实验中,用户进程或内核线程还没有睡眠的支持机制。在课程中提到用户进程或内核线程可以转入等待状态以等待某个特定事件(比如睡眠,等待子进程结束,等待信号量等),当该事件发生时这些进程能够被再次唤醒。内核实现这一功能的一个底层支撑机制就是等待队列wait_queue,等待队列和每一个事件(睡眠结束、时钟到达、任务完成、资源可用等)联系起来。需要等待事件的进程在转入休眠状态后插

  • 等待队列接口 结构体 struct   rt_wqueue   等待队列控制块 更多...   struct   rt_wqueue_node   等待队列节点 更多...   宏定义 #define  RT_WQ_FLAG_CLEAN   0x00   等待队列清除   #define  RT_WQ_FLAG_WAKEUP   0x01   等待队列唤醒   #define  DEFINE_WA

  • 现在我想让所有的期货最多等待n秒,直到全部完成。我知道我可以调用,但是如果我在循环中对我的所有期货顺序地调用它,则超时开始增加。伪代码: 块会出现超时,直到结果就绪。因此,如果第一个在超时之前完成,第二个也在超时之前完成,依此类推,则整个执行时间最多为而不是。 因此,我正在寻找一个方法,它接受的列表和一个超时,并行运行所有的结果,然后返回一个未来结果的集合。有什么想法吗?