当前位置: 首页 > 知识库问答 >
问题:

写入数据帧时出错:java。lang.RuntimeException:scala。Tuple2不是struct架构的有效外部类型

公冶和豫
2023-03-14

我有一个数据集,在将其写入json之前,我正在提取并应用特定的模式。

我的测试数据集如下所示:

cityID|retailer|postcode

123|a1|1

123|s1|2

123|d1|3

124|a1|4

124|s1|5

124|d1|6

我想按城市ID分组。然后应用下面的模式并将其放入数据框中。然后我想把数据写成json。我的代码如下:

按城市ID分组

val rdd1 = cridf.rdd.map(x=>(x(0).toString, (x(1).toString, x(2).toString))).groupByKey() 

将RDD映射到行

val final1 = rdd1.map(x=>Row(x._1,x._2.toList))

正在应用架构

val schema2 = new StructType()
.add("cityID", StringType)
.add("reads", ArrayType(new StructType()
.add("retailer", StringType)
.add("postcode", IntegerType)))

创建数据框

val parsedDF2 = spark.createDataFrame(final1, schema2)

写入json文件

parsedDF2.write.mode("overwrite")
.format("json")
.option("header", "false")
.save("/XXXX/json/testdata")

由于以下错误,作业中止:

java.lang.RuntimeException:编码时出错:

java.lang.RuntimeException:scala. Tuple2不是结构模式的有效外部类型

共有2个答案

云伯寅
2023-03-14

如果无法避免使用RDD,可以使用case类:

case class Read(retailer: String, postcode: Int)
case class Record(cityId: String, reads: List[Read])

...

val rdd1 = cridf.rdd
    .map(x => (x.head, Read(x(1), x(2).toInt)))
    .groupByKey

val final1 = rdd1
    .map(x => Record(x._1, x._2.toList))
    .toDF

final1
   .write
   .mode("overwrite")
   .format("json")
   .option("header", "false")
   .save("/XXXX/json/testdata")

final1具有以下模式:

root
 |-- cityId: string (nullable = true)
 |-- reads: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- retailer: string (nullable = true)
 |    |    |-- postcode: integer (nullable = false)

然而,我认为@partha\u devArch解决方案要好得多。

只需对代码添加最少的内容并使用提供的模式,解决方案如下:

import org.apache.spark.sql.catalyst.encoders.RowEncoder

...

val rdd1 = cridf.rdd
    .map(x => (x.head, Row(x(1), x(2).toInt)))
    .groupByKey

val final1 = rdd1
    .map(x => Row(x._1, x._2.toList))(RowEncoder.apply(schema2).clsTag)

val parsedDF2 = spark.createDataFrame(final1, schema2)

parsedDF2
    .write
    .mode("overwrite")
    .format("json")
    .option("header", "false")
    .save("/XXXX/json/testdata")
甄正信
2023-03-14

您可以直接从数据帧进行转换。给你:

   val rawData = spark.read.option("header", "true").option("delimiter", "|").csv("57407427.csv")

   import org.apache.spark.sql.functions._
   val readsDf = rawData.withColumn("reads",struct("retailer", "postcode")).drop("retailer", "postcode" )

   val finalJsonDf = readsDf.groupBy("cityID").agg(collect_list("reads").alias("reads"))

   finalJsonDf.printSchema() //for testing the schema

   finalJsonDf.coalesce(1).write.mode("overwrite")
     .format("json")
     .option("header", "false")
     .save("57407427_Op.json")

希望您也能写出相同的json输出:

 {"cityID":"124","reads":[{"retailer":"a1","postcode":"4"},{"retailer":"s1","postcode":"5"},{"retailer":"d1","postcode":"6"}]}
 {"cityID":"123","reads":[{"retailer":"a1","postcode":"1"},{"retailer":"s1","postcode":"2"},{"retailer":"d1","postcode":"3"}]}
 类似资料:
  • 我正在尝试使用Databricks的spark-csv2.10依赖关系将一个数据帧写入到HDFS的*.csv文件。依赖关系似乎可以正常工作,因为我可以将.csv文件读入数据帧。但是当我执行写操作时,我会得到以下错误。将头写入文件后会出现异常。 当我将查询更改为时,write工作很好。 有谁能帮我一下吗? 编辑:根据Chandan的请求,这里是的结果

  • 我正在尝试将一个文件夹中的多个csv文件导入到一个数据帧中。这是我的密码。它可以遍历文件并成功地打印它们,它可以将一个文件读入一个数据帧,但将它们组合在一起打印会出错。我看到了很多类似的问题,但回答是复杂的,我认为“Pythonic”的方式是简单的,因为我是新手。事先谢谢你的帮助。错误消息总是:没有这样的文件或目录:“一些文件名”,这没有意义,因为它成功地打印了文件名在打印步骤。

  • 考虑到这一点,似乎在过去做过这件事,但是…: 我明白了: 无法立即看到它。

  • 我正在使用mongodb构建一个SpringWebApp,但最近我开始在写DB时遇到问题。下面是我得到的堆栈跟踪。 阅读之后,最常见的原因似乎是依赖项不匹配,但我不确定在这种情况下,哪些依赖项实际上是相互兼容的。 波姆。xml 这些主要是最新的发布版本,尽管我将Spring框架更改为4.0.7。发布尝试并修复它,但我也使用了4.1.0。释放并发生相同的异常。 谢啦 EDIT:@Document注释

  • 问题内容: 我正在使用带有Java程序的MySql数据库,现在我想将该程序提供给其他人。 如何导出MySql数据库结构而不包含数据,仅导出结构? 问题答案: 您可以通过mysqldump命令使用该选项

  • 使用“file_loads”技术通过Apache Beam数据流作业写入BigQuery时出错。流式插入(else块)工作正常,符合预期。file_load(如果块)失败,错误在代码后面给出。bucket中GCS上的临时文件是有效的JSON对象。 来自pub/sub的原始事件示例: 数据流作业出错: