当前位置: 首页 > 知识库问答 >
问题:

卡在斯卡拉的未来

范修伟
2023-03-14

基本上,我在cassandra上运行两个期货查询,然后我需要做一些计算并返回值(值的平均值)。

这是我的代码:

object TestWrapFuture {
  def main(args: Array[String]) {
    val category = 5392
    ExtensiveComputation.average(category).onComplete {
      case Success(s) => println(s)
      case Failure(f) => throw new Exception(f)
    }
  }
}

class ExtensiveComputation {

  val volume = new ListBuffer[Int]()

  def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    productsByCategory.map { prods =>
      for (prod <- prods if prod._2) {
        Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }
      }

      val average = volume.sum / volume.length
      average
    }
  }
}

object ExtensiveComputation extends ExtensiveComputation

那么问题出在哪里呢?

skus.foreach 在 ListBuffer 中追加结果值。由于一切都是异步的,当我尝试在我的主数据库中获取结果时,我得到了一个错误,说我不能被零除。

事实上,由于我的Sku.findSkusByProduct返回一个Future,当我尝试计算平均值时,卷是空的。

我应该在此计算之前阻止任何内容,还是应该执行其他操作?

编辑

嗯,我试图这样阻止:

  val volume = new ListBuffer[Int]()

  def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    val blocked = productsByCategory.map { prods =>
      for (prod <- prods if prod._2) {
        Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }
      }
    }

    Await.result(blocked, Duration.Inf)
    val average = volume.sum / volume.length
    Future.successful(average)
  }

然后我从这段代码中得到了两个不同的结果:

    Sku.findSkusByProductId(prod._1).map { skus =>
          skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
        }

1-当卡桑德拉只有几个像50个的人需要寻找时,它就会运行并给我结果

2 - 当有很多像1000这样的人时,它给了我

java.lang.ArithmeticException:/by零

编辑2

我按照@Olivier Michallat的建议尝试了这个代码

   def average(categoryId: Int): Future[Double] = {

    val productsByCategory = Product.findProductsByCategory(categoryId)

    productsByCategory.map { prods =>
      for (prod <- prods if prod._2) findBlocking(prod._1)
      volume.sum / volume.length
    }
  }

  def findBlocking(productId: Long) = {
    val future = Sku.findSkusByProductId(productId).map { skus =>
      skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
    }

    Await.result(future, Duration.Inf)
  }

正如@kolmar提议的:

   def average(categoryId: Int): Future[Int] = {
    for {
      prods <- Product.findProductsByCategory(categoryId)
      filtered = prods.filter(_._2)
      skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
    } yield {
      val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get)
      volumes.sum / volumes.size
    }
  }

两者都使用少量SKU查找,如50个,但都无法使用许多SKU查找到,如1000个,抛出算术异常:/by零

似乎在返回未来之前,它无法计算出一切......

共有2个答案

乌杰
2023-03-14

由于您必须调用一个在参数序列上返回Future的函数,因此最好使用Future.traverse

例如:

object ExtensiveComputation {
  def average(categoryId: Int): Future[Double] = {
    for {
      products <- Product.findProductsByCategory(categoryId)
      filtered = products.filter(_._2)
      skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
    } yield {
      val volumes = skus.map { sku => 
        sku.height.get * sku.width.get * sku.length.get }
      volumes.sum / volumes.size
    }
  }
}
冀胤运
2023-03-14

您需要等到findSkusByProductId生成的所有期货完成后,才能计算平均值。因此,将所有这些期货累积在序列中,调用期货。在其上排序,以获得一个未来[Seq]值,然后将该未来值映射到一个计算平均值的函数。然后将替换为productsByCategory。地图,带有平面图

 类似资料:
  • 请看下面的代码,让我知道我哪里做错了? 使用: DSE版本-5.1.0 172.31.16.45:9042连接到测试群集。[cqlsh 5.0.1|Cassandra3.10.0.1652|DSE 5.1.0|CQL规范3.4.4|本地协议v4]使用HELP寻求帮助。 谢谢 斯卡拉 斯卡拉 斯卡拉 我在这里什么都得不到?甚至没有错误。

  • 我正在围绕java库编写一个小的scala包装器。 Java库有一个对象QueryExecutor,它公开了2种方法: 执行(查询):结果 asyncExecute(查询):ListenableFuture[结果] 本文中的ListenableFuture是来自guava图书馆的。 我希望我的scala包装器返回一个Future[Result]而不是java对象,但我不确定实现它的最佳方法是什么。

  • 我在eclipse中将scala项目转换为使用Maven(只需右键单击project并配置Maven build),这就创建了pom。xml,添加了正确的依赖项,它从maven存储库中提取了所需的JAR,但每当我尝试编译时,我都看不到在target\classes文件夹中生成类文件。然而,我在target\classes文件夹下的相应目录中看到了scala文件的实际源代码。我不确定它为什么要复制t

  • 我在Java寻找与斯卡拉的未来相当的东西。 我正在寻找一种构造类型,它允许我将任务(s/s)提交到我选择的特定线程池,返回futures,允许我在任务完成时将一些逻辑(以非阻塞方式)链接到它。大概是这样的: Java的线程池(通过singleton提供)似乎总是返回标准的Javas,它只允许我调用阻塞。另一方面,据我所知,,更类似于Scala的promise,并且不绑定到线程池。 Java提供我想

  • 我是Scala的新手,目前正在尝试使用play框架。 这是我写的工作代码: 嗯,这看起来不太好。我们能做得更好吗?我还不能。但我看到了stackoverflow的帖子:https://stackoverflow.com/a/24085333/3038183看起来很不错:) 现在我想知道如何转换我的代码,就像在给定的示例中一样。当然我已经试过了,但我无法让它编译,也不知道如何处理注释后的代码(“如何

  • 我尝试使用I forest https://github.com/titicaca/spark-iforest,的scala实现,但是当我构建时(就像README中报告的< code>mvn clean package),它给我这些错误: 有人知道为什么吗?谢谢 scala版本2.11.12 火花版本2.4.0 maven版本3.5.2 我修改了pom.xml,调整了scala、spark和mav