当前位置: 首页 > 知识库问答 >
问题:

将RDD[org.apache.spark.sql.Row]转换为RDD[org.apache.spark.mllib.linalg.Vector]

班承德
2023-03-14

我对Spark和Scala相对较新。

我从以下数据帧开始(由密集的双倍向量组成的单列):

scala> val scaledDataOnly_pruned = scaledDataOnly.select("features")
scaledDataOnly_pruned: org.apache.spark.sql.DataFrame = [features: vector]

scala> scaledDataOnly_pruned.show(5)
+--------------------+
|            features|
+--------------------+
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
+--------------------+

直接转换为RDD将生成一个org实例。阿帕奇。火花rdd。RDD[org.apache.spark.sql.Row]:

scala> val scaledDataOnly_rdd = scaledDataOnly_pruned.rdd
scaledDataOnly_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[32] at rdd at <console>:66

有人知道如何将此DF转换为org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.向量]的实例吗?到目前为止,我的各种尝试都没有成功。

提前感谢您的指点!

共有3个答案

翟聪
2023-03-14
import org.apache.spark.mllib.linalg.Vectors

scaledDataOnly
   .rdd
   .map{
      row => Vectors.dense(row.getAs[Seq[Double]]("features").toArray)
     }
长孙永思
2023-03-14

编辑:使用更复杂的方式来解释行中的字段。

这对我有用

val featureVectors = features.map(row => {
  Vectors.dense(row.toSeq.toArray.map({
    case s: String => s.toDouble
    case l: Long => l.toDouble
    case _ => 0.0
  }))
})

features是spark SQL的数据框架

容学林
2023-03-14

刚刚发现:

val scaledDataOnly_rdd = scaledDataOnly_pruned.map{x:Row => x.getAs[Vector](0)}
 类似资料:
  • 我使用的是Apache Spark 1.6.2 我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame 映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误 以下是我的代码: 有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了 数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数

  • 我试图将JDBC的ResultSet转换成Spark RDD,并寻找一种有效的方法来使用Spark的并行特性。 以下是我按照这个https://stackoverflow.com/a/32073423/6064131实现的 现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一根针提取的eye.But有没有更好的方法来实现这一点? 有些人可能想知道为什么我没有使用内置功能sqlContext

  • 我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码:

  • 嗨,伙计们,我有下一个问题。我正在使用Java的Apache Spark Streaming v1.6.0来获取来自IBMMQ的一些消息。我为MQ制作了自定义接收器,但我遇到的问题是我需要将RDD从JavaDStream转换为DataFrame。为此,我使用foreachRDD迭代JavaDStream,并定义了DataFrame的模式,但当我运行作业时,第一条消息会引发下一个异常: Java语言

  • 有人能分享一下如何将转换为吗?

  • 我是Spark和Scala的新手,我正在尝试阅读它在MLlib上的文档。 关于 http://spark.apache.org/docs/1.4.0/mllib-data-types.html 的教程, 不显示如何从本地向量列表构造RDD[Vector](可变行)。 例如,我已经在火花壳中执行(作为我探索的一部分) 如果“合并”,它将看起来像这个矩阵 那么,如何将矢量 、 转换为?