当前位置: 首页 > 知识库问答 >
问题:

RDD转换和操作只能由驱动程序调用

宰父宾实
2023-03-14

错误:

org.apache.spark.SparkExc0019:RDD转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x=

def computeRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
  val numDistinctUsers = test_data.map(x => x.user).distinct().count()
  val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
    (u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
  })
  val hitsAndMiss: RDD[(Int, Double)] = userRecs.map(x => (x._1, x._2.intersect(x._3).size.toDouble))

  val hits = hitsAndMiss.map(x => x._2).sum() / numDistinctUsers

  return hits
}

我使用的方法在MatrixFactorizationModel.scala,我必须映射用户,然后调用该方法来获得每个用户的结果。通过这样做,我引入了嵌套映射,我认为这会导致问题:

我知道这个问题实际上发生在:

val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
  (u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})

因为在映射过程中,我调用了model。推荐产品

共有1个答案

徐唯
2023-03-14

MatrixFactorizationModel是一个分布式模型,因此不能简单地从操作或转换调用它。最接近你在这里所做的事情是这样的:

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}

def computeRatio(model: MatrixFactorizationModel, testUsers: RDD[Rating]) = {
  val testData = testUsers.map(r => (r.user, r.product)).groupByKey
  val n = testData.count

  val recommendations = model
     .recommendProductsForUsers(20)
     .mapValues(_.map(r => r.product))

  val hits = testData
    .join(recommendations)
    .values
    .map{case (xs, ys) => xs.toSet.intersect(ys.toSet).size}
    .sum

  hits / n
}

笔记

  • 不同的是一个昂贵的操作,在这里完全过时,因为您可以从分组数据中获得相同的信息
  • 而不是group后跟投影(map),先项目后组。如果您只想要产品ID,则没有理由传输完整的评级。
 类似资料:
  • 主要内容:转换,操作RDD提供两种类型的操作: 转换 行动 转换 在Spark中,转换的作用是从现有数据集创建新数据集。转换是惰性的,因为它们仅在动作需要将结果返回到驱动程序时才计算。 下面来看看一些常用的RDD转换。 - 它返回一个新的分布式数据集, 该数据集是通过函数传递源的每个元素而形成的。 - 它返回一个新数据集, 该数据集是通过选择函数返回的源元素而形成的。 - 这里,每个输入项可以映射到零个或多个输出项,

  • RDDs 支持 2 种类型的操作:转换(transformations) 从已经存在的数据集中创建一个新的数据集;动作(actions) 在数据集上进行计算之后返回一个值到驱动程序。例如,map 是一个转换操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce 是一个动作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动程序(不过也有一个并行

  • 问题内容: 我有一个包含用户信息的数据库,我想创建一个公共静态变量,以便在任何给定时间返回数据库整数,而不必为每个整数都做一个空,但这给了我这个错误: 这是我的代码: 有人知道我在做什么错吗?我尝试使用Google的东西,但是更改代码将我从一个错误转移到了另一个错误…因此,我现在还不确定。 我正在运行JDBC驱动程序sqlite-jdbc-3.8.11.2.jar 问题答案: 未实现。我想你只是想

  • 访问 获取子节点的Path 为了得到一个AST节点的属性值,我们一般先访问到该节点,然后利用 path.node.property 方法即可。 // the BinaryExpression AST node has properties: `left`, `right`, `operator` BinaryExpression(path) { path.node.left; path.n

  • 驱动程序版本为: 我的问题是,当我使用api find和一些来自java的过滤器时,操作需要15秒。 我检查了mongo服务器日志文件,发现跟踪是一个命令,而不是一个查询: 2015-09-01T12:11:47.496+0200I命令[conn503]命令b.$CMD命令:计数{count:“logs”,查询:{timestamp:{$GTE:新日期(1433109600000)},aplica

  • 我在JBoss EAP7上通过这个示例安装了mssql驱动程序。启动将引发以下错误: 由:java.lang.noClassDefFounderRor:javax/xml/bind/datatypeConverter at com.microsoft.sqlserver.jdbc.sqlServerConnection.sendLogon(SqlServerConnection.java:4098