我在Spark中有一个数据框,看起来像这样:
它有30列:只显示其中的一些!
[ABCD,color,NORMAL,N,2015-02-20,1]
[XYZA,color,NORMAL,N,2015-05-04,1]
[GFFD,color,NORMAL,N,2015-07-03,1]
[NAAS,color,NORMAL,N,2015-08-26,1]
[LOWW,color,NORMAL,N,2015-09-26,1]
[KARA,color,NORMAL,N,2015-11-08,1]
[ALEQ,color,NORMAL,N,2015-12-04,1]
[VDDE,size,NORMAL,N,2015-12-23,1]
[QWER,color,NORMAL,N,2016-01-18,1]
[KDSS,color,NORMAL,Y,2015-08-29,1]
[KSDS,color,NORMAL,Y,2015-08-29,1]
[ADSS,color,NORMAL,Y,2015-08-29,1]
[BDSS,runn,NORMAL,Y,2015-08-29,1]
[EDSS,color,NORMAL,Y,2015-08-29,1]
因此,我必须在Scala中将这个数据帧转换成一个键值对,使用键作为数据帧中的一些列,并为这些键分配从索引0到计数(不同的键数)的唯一值。
例如:使用上面的案例,我希望在Scala中的map(key-value)集合中有一个输出,如下所示:
([ABC_color_NORMAL_N_1->0]
[XYZA_color_NORMAL_N_1->1]
[GFFD_color_NORMAL_N_1->2]
[NAAS_color_NORMAL_N_1->3]
[LOWW_color_NORMAL_N_1->4]
[KARA_color_NORMAL_N_1->5]
[ALEQ_color_NORMAL_N_1->6]
[VDDE_size_NORMAL_N_1->7]
[QWER_color_NORMAL_N_1->8]
[KDSS_color_NORMAL_Y_1->9]
[KSDS_color_NORMAL_Y_1->10]
[ADSS_color_NORMAL_Y_1->11]
[BDSS_runn_NORMAL_Y_1->12]
[EDSS_color_NORMAL_Y_1->13]
)
我对斯卡拉和斯帕克是新手,我试着做这样的事情。
var map: Map[String, Int] = Map()
var i = 0
dataframe.foreach( record =>{
//Is there a better way of creating a key!
val key = record(0) + record(1) + record(2) + record(3)
var index = i
map += (key -> index)
i+=1
}
)
但是,这不起作用。:/此操作完成后,映射为空。
代码中的主要问题是试图在workers上执行的代码中修改在驱动程序端创建的变量。使用Spark时,RDD转换中的驱动端变量只能用作“只读”值。
明确地:
Foreach
完成时,结果被丢弃-结果不会返回给驱动程序。要解决这个问题,您应该选择一个转换,返回一个更改的RDD(例如,map
)来创建密钥,使用zipWithIndex
添加正在运行的“ID”,然后使用collectaMap
将所有数据作为映射返回给驱动程序:
val result: Map[String, Long] = dataframe
.map(record => record(0) + record(1) + record(2) + record(3))
.zipWithIndex()
.collectAsMap()
至于键创建本身——假设您想包括前5列,并在它们之间添加分隔符(_
),您可以使用:
record => record.toList.take(5).mkString("_")
RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:
我正在尝试将熊猫DF转换为Spark one。测向头: 代码: 我得到了一个错误:
我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。
我有以下两个场景共享的前奏代码: 现在,我想将df转换为pyspark数据帧(
来自Spark源代码: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972 可能需要与首先计算 所需的时间一样长。因此,这使得诸如 非常贵。假设< code>DataFrame是< code>DataSet[Row],而< code>
我在S3有一些遗留数据,我想使用Spark 2和Java API将它们转换成parquet格式。 我有所需的Avro模式(. avsc文件)及其使用Avro编译器生成的Java类,我想使用这些模式以Parque格式存储数据。输入数据不是任何标准格式,但我有一个库,可以将遗留文件中的每一行转换为Avro类。 是否可以将数据作为