基本上,我在cassandra上运行两个期货查询,然后我需要做一些计算并返回值(值的平均值)。
这是我的代码:
object TestWrapFuture {
def main(args: Array[String]) {
val category = 5392
ExtensiveComputation.average(category).onComplete {
case Success(s) => println(s)
case Failure(f) => throw new Exception(f)
}
}
}
class ExtensiveComputation {
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
val average = volume.sum / volume.length
average
}
}
}
object ExtensiveComputation extends ExtensiveComputation
那么问题出在哪里呢?
skus.foreach 在 ListBuffer 中追加结果值。由于一切都是异步的,当我尝试在我的主数据库中获取结果时,我得到了一个错误,说我不能被零除。
事实上,由于我的Sku.findSkusByProduct返回一个Future,当我尝试计算平均值时,卷是空的。
我应该在此计算之前阻止任何内容,还是应该执行其他操作?
编辑
嗯,我试图这样阻止:
val volume = new ListBuffer[Int]()
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
val blocked = productsByCategory.map { prods =>
for (prod <- prods if prod._2) {
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
}
}
Await.result(blocked, Duration.Inf)
val average = volume.sum / volume.length
Future.successful(average)
}
然后我从这段代码中得到了两个不同的结果:
Sku.findSkusByProductId(prod._1).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
1-当卡桑德拉只有几个像50个的人需要寻找时,它就会运行并给我结果
2 - 当有很多像1000这样的人时,它给了我
java.lang.ArithmeticException:/by零
编辑2
我按照@Olivier Michallat的建议尝试了这个代码
def average(categoryId: Int): Future[Double] = {
val productsByCategory = Product.findProductsByCategory(categoryId)
productsByCategory.map { prods =>
for (prod <- prods if prod._2) findBlocking(prod._1)
volume.sum / volume.length
}
}
def findBlocking(productId: Long) = {
val future = Sku.findSkusByProductId(productId).map { skus =>
skus.foreach(sku => volume += (sku.height.get * sku.width.get * sku.length.get))
}
Await.result(future, Duration.Inf)
}
正如@kolmar提议的:
def average(categoryId: Int): Future[Int] = {
for {
prods <- Product.findProductsByCategory(categoryId)
filtered = prods.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.flatten.map(sku => sku.height.get * sku.width.get * sku.length.get)
volumes.sum / volumes.size
}
}
两者都使用少量SKU查找,如50个,但都无法使用许多SKU查找到,如1000个,抛出算术异常:/by零
似乎在返回未来之前,它无法计算出一切......
由于您必须调用一个在参数序列上返回Future
的函数,因此最好使用Future.traverse
。
例如:
object ExtensiveComputation {
def average(categoryId: Int): Future[Double] = {
for {
products <- Product.findProductsByCategory(categoryId)
filtered = products.filter(_._2)
skus <- Future.traverse(filtered)(p => Sku.findSkusByProductId(p._1))
} yield {
val volumes = skus.map { sku =>
sku.height.get * sku.width.get * sku.length.get }
volumes.sum / volumes.size
}
}
}
您需要等到findSkusByProductId
生成的所有期货完成后,才能计算平均值。因此,将所有这些期货累积在序列中,调用
值,然后将该未来值映射到一个计算平均值的函数。然后将期货。在其上排序
,以获得一个未来[Seq]替换为productsByCategory。地图
,带有平面图
。
请看下面的代码,让我知道我哪里做错了? 使用: DSE版本-5.1.0 172.31.16.45:9042连接到测试群集。[cqlsh 5.0.1|Cassandra3.10.0.1652|DSE 5.1.0|CQL规范3.4.4|本地协议v4]使用HELP寻求帮助。 谢谢 斯卡拉 斯卡拉 斯卡拉 我在这里什么都得不到?甚至没有错误。
我正在围绕java库编写一个小的scala包装器。 Java库有一个对象QueryExecutor,它公开了2种方法: 执行(查询):结果 asyncExecute(查询):ListenableFuture[结果] 本文中的ListenableFuture是来自guava图书馆的。 我希望我的scala包装器返回一个Future[Result]而不是java对象,但我不确定实现它的最佳方法是什么。
我在eclipse中将scala项目转换为使用Maven(只需右键单击project并配置Maven build),这就创建了pom。xml,添加了正确的依赖项,它从maven存储库中提取了所需的JAR,但每当我尝试编译时,我都看不到在target\classes文件夹中生成类文件。然而,我在target\classes文件夹下的相应目录中看到了scala文件的实际源代码。我不确定它为什么要复制t
我在Java寻找与斯卡拉的未来相当的东西。 我正在寻找一种构造类型,它允许我将任务(s/s)提交到我选择的特定线程池,返回futures,允许我在任务完成时将一些逻辑(以非阻塞方式)链接到它。大概是这样的: Java的线程池(通过singleton提供)似乎总是返回标准的Javas,它只允许我调用阻塞。另一方面,据我所知,,更类似于Scala的promise,并且不绑定到线程池。 Java提供我想
我是Scala的新手,目前正在尝试使用play框架。 这是我写的工作代码: 嗯,这看起来不太好。我们能做得更好吗?我还不能。但我看到了stackoverflow的帖子:https://stackoverflow.com/a/24085333/3038183看起来很不错:) 现在我想知道如何转换我的代码,就像在给定的示例中一样。当然我已经试过了,但我无法让它编译,也不知道如何处理注释后的代码(“如何
我尝试使用I forest https://github.com/titicaca/spark-iforest,的scala实现,但是当我构建时(就像README中报告的< code>mvn clean package),它给我这些错误: 有人知道为什么吗?谢谢 scala版本2.11.12 火花版本2.4.0 maven版本3.5.2 我修改了pom.xml,调整了scala、spark和mav