错误:
org.apache.spark.SparkExc0019:RDD转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x=
def computeRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val numDistinctUsers = test_data.map(x => x.user).distinct().count()
val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
(u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})
val hitsAndMiss: RDD[(Int, Double)] = userRecs.map(x => (x._1, x._2.intersect(x._3).size.toDouble))
val hits = hitsAndMiss.map(x => x._2).sum() / numDistinctUsers
return hits
}
我使用的方法在MatrixFactorizationModel.scala
,我必须映射用户,然后调用该方法来获得每个用户的结果。通过这样做,我引入了嵌套映射,我认为这会导致问题:
我知道这个问题实际上发生在:
val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
(u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})
因为在映射过程中,我调用了model。推荐产品
MatrixFactorizationModel
是一个分布式模型,因此不能简单地从操作或转换调用它。最接近你在这里所做的事情是这样的:
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}
def computeRatio(model: MatrixFactorizationModel, testUsers: RDD[Rating]) = {
val testData = testUsers.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations = model
.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val hits = testData
.join(recommendations)
.values
.map{case (xs, ys) => xs.toSet.intersect(ys.toSet).size}
.sum
hits / n
}
笔记:
不同的
是一个昂贵的操作,在这里完全过时,因为您可以从分组数据中获得相同的信息group
后跟投影(map
),先项目后组。如果您只想要产品ID,则没有理由传输完整的评级。主要内容:转换,操作RDD提供两种类型的操作: 转换 行动 转换 在Spark中,转换的作用是从现有数据集创建新数据集。转换是惰性的,因为它们仅在动作需要将结果返回到驱动程序时才计算。 下面来看看一些常用的RDD转换。 - 它返回一个新的分布式数据集, 该数据集是通过函数传递源的每个元素而形成的。 - 它返回一个新数据集, 该数据集是通过选择函数返回的源元素而形成的。 - 这里,每个输入项可以映射到零个或多个输出项,
RDDs 支持 2 种类型的操作:转换(transformations) 从已经存在的数据集中创建一个新的数据集;动作(actions) 在数据集上进行计算之后返回一个值到驱动程序。例如,map 是一个转换操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce 是一个动作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动程序(不过也有一个并行
问题内容: 我有一个包含用户信息的数据库,我想创建一个公共静态变量,以便在任何给定时间返回数据库整数,而不必为每个整数都做一个空,但这给了我这个错误: 这是我的代码: 有人知道我在做什么错吗?我尝试使用Google的东西,但是更改代码将我从一个错误转移到了另一个错误…因此,我现在还不确定。 我正在运行JDBC驱动程序sqlite-jdbc-3.8.11.2.jar 问题答案: 未实现。我想你只是想
访问 获取子节点的Path 为了得到一个AST节点的属性值,我们一般先访问到该节点,然后利用 path.node.property 方法即可。 // the BinaryExpression AST node has properties: `left`, `right`, `operator` BinaryExpression(path) { path.node.left; path.n
驱动程序版本为: 我的问题是,当我使用api find和一些来自java的过滤器时,操作需要15秒。 我检查了mongo服务器日志文件,发现跟踪是一个命令,而不是一个查询: 2015-09-01T12:11:47.496+0200I命令[conn503]命令b.$CMD命令:计数{count:“logs”,查询:{timestamp:{$GTE:新日期(1433109600000)},aplica
我在JBoss EAP7上通过这个示例安装了mssql驱动程序。启动将引发以下错误: 由:java.lang.noClassDefFounderRor:javax/xml/bind/datatypeConverter at com.microsoft.sqlserver.jdbc.sqlServerConnection.sendLogon(SqlServerConnection.java:4098