当前位置: 首页 > 知识库问答 >
问题:

scala中如何控制future.sequence的并发性?

裴嘉许
2023-03-14

我知道我可以通过以下方式将Seq[Future[T]]转换为Future[Seq[T]]

  val seqFuture = Future.sequence(seqOfFutures)
  seqFuture.map((seqT: Seq[T]) => {...})

我现在的问题是,我在那个序列中有700个未来,我希望能够控制并行解决其中的多少个,因为每个未来都将调用内部rest api,同时有700个请求就像对那个服务器发起dos攻击

我宁愿一次只能解决10个期货。

我如何才能做到这一点?

尝试pamu的答案,我看到了错误:

[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error]         val batch = Future.sequence(c.map(_()))
[error]                                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : List[Nothing]
[error]  required: ?M[scala.concurrent.Future[?A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error]  found   : List[Nothing]
[error]  required: M[scala.concurrent.Future[A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                                          ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error]         val batch = Future.sequence(c.map(_()))
[error]                                    ^
[error] four errors found

共有2个答案

董康平
2023-03-14

并发是Scala的未来s由ExecutionContext控制。请注意,futures在创建后立即开始在上下文上执行,因此Future的ExecutionContext。序列并不重要。从序列创建原始期货时,必须提供适当的上下文。

默认上下文<code>ExecutionContext。全局(通常通过<code>导入scala.concurrent.ExecutionContext.Implicits.global)使用与处理器内核数量相同的线程,但它也可以为阻塞任务创建许多附加线程,这些线程封装在<code>scala.concurrent.blocking中。这通常是理想的行为,但不适合您的问题。

幸运的是,您可以使用< code > execution context . from executor 方法来包装Java线程池。例如:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

val context = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() }(context))
val sequenceFuture = Future.sequence(seqOfFutures)(ExecutionContext.global)

当然,上下文也可以隐式提供:

implicit val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() })
// This `sequence` uses the same thread pool as the original futures
val sequenceFuture = Future.sequence(seqOfFutures) 
郎泰平
2023-03-14

简单的 foldLeft 可用于控制一次并发运行的期货数量。

首先,让我们创建一个名为< code>LazyFuture的case类

case class LazyFuture[+A](f: Unit => Future[A]) {
  def apply() = f()
}

object LazyFuture {
  def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

  def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}

LazyFuture会立即停止future的运行

val list: List[LazyFuture[A]] = ...


list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
  val batch = Future.sequence(c.map(_()))
  batch.flatMap(values => r.map(rs => rs ++ values))
}

相应地更改concurFactor以同时运行多个期货。

concurFactor of 1 将一次运行一个未来

2的concurFactor将同时运行两个期货

等等...

def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
   list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
      val batch = Future.sequence(c.map(_()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }
  case class LazyFuture[+A](f: Unit => Future[A]) {
    def apply() = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

    def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

您还可以通过限制执行池中的线程数量来限制计算资源。但是,这种解决方案并不灵活。就我个人而言,我不喜欢它。

val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

您必须记住传递正确的执行上下文,这是一个隐式值。有时我们不知道哪个隐式在范围内。这是越野车

当未来像下面这样构造时

val foo = Future {
     1 + 2
} // future starts executing

LazyFuture(foo) // Not a right way

foo已经开始执行,无法控制。

构建LazyFuture的正确方法

val foo = LazyFuture {
  1 + 2
}

或者

val foo = LazyFuture {
  Future {
   1 + 2
  }
}
package main

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

object Main {

  case class LazyFuture[A](f: Unit => Future[A]) {
    def apply(): Future[A] = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
    def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
    (implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => r.map(values=> rs ++ values))
    }

  def main(args: Array[String]): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global


    val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
      LazyFuture {
        println(s"value: $value started")
        Thread.sleep(value * 200)
        println(s"value: $value stopped")
        value
      }
    }
    val f = executeBatch(futures.toList)(2)
    Await.result(f, Duration.Inf)
  }

}
 类似资料:
  • 我正在使用kafka-node ConsumerGroup来消费来自主题的消息。ConsumerGroup在使用消息时需要调用外部API,这可能需要一秒钟的时间来响应。我希望控制从队列中消费下一条消息,直到我从API得到响应,这样消息就会被顺序地处理。 我该如何控制这种行为?

  • 主要内容:并发控制的问题在并发控制中,可以同时执行多个事务。 它可能会影响事务结果。保持这些事务的执行顺序非常重要。 并发控制的问题 并发事务以不受控制的方式执行时可能会出现几个问题。 以下是并发控制中的三个问题。 更新丢失 脏读 不可重复读取 1. 更新丢失 当访问相同数据库项的两个事务包含其操作时,某些数据库项的值不正确,则会发生丢失的更新问题。 如果两个事务T1和T2读取记录然后更新它,那么第二个更新将覆盖更新第一

  • 配置样例 样例 1 限制 com.foo.BarService 的每个方法,服务器端并发执行(或占用线程池线程数)不能超过 10 个: <dubbo:service interface="com.foo.BarService" executes="10" /> 样例 2 限制 com.foo.BarService 的 sayHello 方法,服务器端并发执行(或占用线程池线程数)不能超过 10

  • 如果一个语言要实现支持并发执行的接口,则一般来说需要在并发控制上下功夫,原因就是前面说的,由于虚拟机实现的细节问题,直接依赖宿主环境的并发容易出问题。简单地,以使用宿主的线程为例。假如源语言的线程对应宿主环境的真线程,那么同步操作就需要用到线程间的互斥量,比如锁,信号量等 一个程序需要并发,一般来说有三个原因: 一,为充分利用多核cpu资源,提高计算速度。这个原因是很重要,但在实际中其重要性我觉得

  • 问题内容: 我已经使用了Spring Security 3.0.7,并且正在我的项目中实现并发控制。但这是行不通的。我用过了 甚至我尝试了Spring安全参考中的解决方案,但没有成功。这是我的配置文件内容: 我收到以下异常: 有人可以帮忙解决这个问题吗? 问题答案: 如果您已经编写了和(您自己的实现),则应该重写Object 和方法。

  • 一、概述: 在SQLite中,锁和并发控制机制都是由pager_module模块负责处理的,如ACID(Atomic, Consistent, Isolated, and Durable)。在含有数据修改的事务中,该模块将确保或者所有的数据修改全部提交,或者全部回滚。与此同时,该模块还提供了一些磁盘文件的内存Cache功能。 事实上,pager_module模块并不关心数据库存储的细节,如B-Tr