当前位置: 首页 > 知识库问答 >
问题:

火花createDataFrame()不使用Seq RDD

贺子昂
2023-03-14

CreateDataFrame接受2个参数,一个rdd和模式。

我的图式是这样的

<代码>val schemas=结构类型(Seq(StructField(“number”,IntegerType,false),StructField(“notation”,StringType,false)))

在一种情况下,我能够从RDD创建数据帧,如下所示:

`val data1=Seq(Row(1,"one"),Row(2,"two"))

val rdd=spark.sparkContext.parallelize(data1)

val final_df= spark.createDataFrame(rdd,schemas)`

在以下其他情况下。。我不能

`val data2=Seq((1,"one"),(2,"two"))

val rdd=spark.sparkContext.parallelize(data2)

val final_df= spark.createDataFrame(rdd,schemas)`

data2不能成为Dataframe的有效rdd有什么错?

但是我们可以使用toDF()和data2创建数据帧,但不能创建CreateDataFrame。

val data2\u DF=序列((1,“一”),(2,“二”))。toDF(“数字”、“符号”)

请帮助我理解这种行为。

创建dataframe时是否强制执行行?

共有1个答案

许淳
2023-03-14

在第二种情况下,只需执行以下操作:

val final_df = spark.createDataFrame(rdd)

因为您的RDD是Tuple2的RDD(这是一个产品),所以模式在编译时是已知的,所以不需要指定模式

 类似资料:
  • 我正在使用火花流,我从Kafka读取流。阅读此流后,我将其添加到hazelcast地图中。 问题是,我需要在读取Kafka的流之后立即从地图中过滤值。 我正在使用下面的代码来并行化地图值。 但在这个逻辑中,我在另一个逻辑中使用JavaRDD,即JavaInputDStream.foreachRDD,这会导致序列化问题。 第一个问题是,如何通过事件驱动来运行spark作业? 另一方面,我只是想得到一

  • 我正在尝试使用python库Tweepy来传输twitter数据。我设置了工作环境,谷歌了一下这些东西,但是我不知道它们是如何工作的。我想在python (tweepy)中使用spark streaming(DStream-Batch processing)。我至少经历了以下环节: < li >如何获取tweepy中某个位置的特定标签的推文? < Li > http://spark . Apach

  • 我在aws s3和emr上使用Spark 2.4进行项目,我有一个左连接,有两个巨大的数据部分。火花执行不稳定,它经常因内存问题而失败。 集群有10台m3.2xlarge类型的机器,每台机器有16个vCore、30 GiB内存、160个SSD GB存储。 我有这样的配置: 左侧连接发生在 150GB 的左侧和大约 30GB 的右侧之间,因此有很多随机播放。我的解决方案是将右侧切得足够小,例如 1G

  • 由于,我检查了一个spark作业的输出拼花文件,该作业总是会发出声音。我在Cloudera 5.13.1上使用了 我注意到拼花地板排的大小是不均匀的。第一排和最后一排的人很多。剩下的真的很小。。。 拼花地板工具的缩短输出,: 这是已知的臭虫吗?如何在Spark中设置拼花地板块大小(行组大小)? 编辑: Spark应用程序的作用是:它读取一个大的AVRO文件,然后通过两个分区键(使用

  • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

  • 我有一个用例,我必须以FIFO方式处理事件。这些是从机器生成的事件。每台机器每30秒生成一个事件。对于特定的机器,我们需要根据FIFO FASION对事件进行处理。 我们每天需要处理大约2.4亿个事件。对于如此大的规模,我们需要使用Kafka+火花流 从Kafka文档中,我了解到我们可以使用消息的关键字段将消息路由到特定的主题分区。这确保我可以使用机器id作为密钥,并确保来自特定机器的所有消息都进