当前位置: 首页 > 知识库问答 >
问题:

使用Scala在Spark数据帧内重新使用JSON中的模式

胡玉书
2023-03-14

我有一些像这样的JSON数据:

{"gid":"111","createHour":"2014-10-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:40:37.0"},{"revId":"4","modDate":"2014-11-20 01:40:40.0"}],"comments":[],"replies":[]}
{"gid":"222","createHour":"2014-12-20 01:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 01:39:31.0"},{"revId":"4","modDate":"2014-11-20 01:39:34.0"}],"comments":[],"replies":[]}
{"gid":"333","createHour":"2015-01-21 00:00:00.0","revisions":[{"revId":"25","modDate":"2014-11-21 00:34:53.0"},{"revId":"110","modDate":"2014-11-21 00:47:10.0"}],"comments":[{"comId":"4432","content":"How are you?"}],"replies":[{"repId":"4441","content":"I am good."}]}
{"gid":"444","createHour":"2015-09-20 23:00:00.0","revisions":[{"revId":"2","modDate":"2014-11-20 23:23:47.0"}],"comments":[],"replies":[]}
{"gid":"555","createHour":"2016-01-21 01:00:00.0","revisions":[{"revId":"135","modDate":"2014-11-21 01:01:58.0"}],"comments":[],"replies":[]}
{"gid":"666","createHour":"2016-04-23 19:00:00.0","revisions":[{"revId":"136","modDate":"2014-11-23 19:50:51.0"}],"comments":[],"replies":[]}

我可以在以下位置阅读:

val df = sqlContext.read.json("./data/full.json")

我可以打印模式与df.print模式

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我可以显示数据。显示(10,false)

+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|comments             |createHour           |gid|replies            |revisions                                                |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+
|[]                   |2014-10-20 01:00:00.0|111|[]                 |[[2014-11-20 01:40:37.0,2], [2014-11-20 01:40:40.0,4]]   |
|[]                   |2014-12-20 01:00:00.0|222|[]                 |[[2014-11-20 01:39:31.0,2], [2014-11-20 01:39:34.0,4]]   |
|[[4432,How are you?]]|2015-01-21 00:00:00.0|333|[[I am good.,4441]]|[[2014-11-21 00:34:53.0,25], [2014-11-21 00:47:10.0,110]]|
|[]                   |2015-09-20 23:00:00.0|444|[]                 |[[2014-11-20 23:23:47.0,2]]                              |
|[]                   |2016-01-21 01:00:00.0|555|[]                 |[[2014-11-21 01:01:58.0,135]]                            |
|[]                   |2016-04-23 19:00:00.0|666|[]                 |[[2014-11-23 19:50:51.0,136]]                            |
+---------------------+---------------------+---+-------------------+---------------------------------------------------------+

我可以打印/读取模式。模式为:

StructType(StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true), StructField(createHour,StringType,true), StructField(gid,StringType,true), StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true), StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true))

我可以打印出来更好:

println(df.schema.fields.mkString(",\n"))
StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
StructField(createHour,StringType,true),
StructField(gid,StringType,true),
StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

现在,如果我在同一个文件中阅读,没有注释和回复行,val df2=sqlContext。阅读json(“/data/partialRevOnly.json”)只要删除这些行,我就可以用打印模式得到这样的结果:

root
 |-- comments: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

我不喜欢这样,所以我使用:

val df3 = sqlContext.read.
  schema(dfSc).
  json("./data/partialRevOnly.json")

其中原始模式是dfSc。所以现在我得到了与之前删除的数据完全相同的模式:

root
 |-- comments: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- comId: string (nullable = true)
 |    |    |-- content: string (nullable = true)
 |-- createHour: string (nullable = true)
 |-- gid: string (nullable = true)
 |-- replies: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- content: string (nullable = true)
 |    |    |-- repId: string (nullable = true)
 |-- revisions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- modDate: string (nullable = true)
 |    |    |-- revId: string (nullable = true)

这太完美了。。。差不多了。我想将此模式分配给类似以下内容的变量:

val textSc =  StructField(comments,ArrayType(StructType(StructField(comId,StringType,true), StructField(content,StringType,true)),true),true),
    StructField(createHour,StringType,true),
    StructField(gid,StringType,true),
    StructField(replies,ArrayType(StructType(StructField(content,StringType,true), StructField(repId,StringType,true)),true),true),
    StructField(revisions,ArrayType(StructType(StructField(modDate,StringType,true), StructField(revId,StringType,true)),true),true)

好的-由于双引号和“一些其他结构”的东西,这将不起作用,所以请尝试以下方法(有错误):

import org.apache.spark.sql.types._

val textSc = StructType(Array(
    StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),
    StructField("createHour",StringType,true),
    StructField("gid",StringType,true),
    StructField("replies",ArrayType(StructType(StructField("content",StringType,true), StructField("repId",StringType,true)),true),true),
    StructField("revisions",ArrayType(StructType(StructField("modDate",StringType,true), StructField("revId",StringType,true)),true),true)
))

Name: Compile Error
Message: <console>:78: error: overloaded method value apply with alternatives:
  (fields: Array[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: java.util.List[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType <and>
  (fields: Seq[org.apache.spark.sql.types.StructField])org.apache.spark.sql.types.StructType
 cannot be applied to (org.apache.spark.sql.types.StructField, org.apache.spark.sql.types.StructField)
           StructField("comments",ArrayType(StructType(StructField("comId",StringType,true), StructField("content",StringType,true)),true),true),

... 如果没有这个错误(我无法快速找到解决方法),我希望使用textSc代替dfSc来读取带有强制模式的JSON数据。

我找不到“1对1匹配”的获取方式(通过println或…)具有可接受语法的模式(类似于上面)。我认为可以使用大小写匹配进行一些编码,以消除双引号。然而,我仍然不清楚需要什么规则才能从测试夹具中获得准确的模式,我可以简单地在我的循环生产(相对于测试夹具)代码中重复使用。有没有办法让这个模式完全按照我编写的代码打印出来?

注意:这包括双引号和所有适当的StructField/type等,以便与代码兼容。

作为一个边栏,我考虑保存一个完整格式的golden JSON文件,以便在Spark作业开始时使用,但我希望最终使用日期字段和其他更简洁的类型,而不是在适用的结构位置使用字符串。

如何从我的测试工具(使用带有注释和回复的完全格式的JSON输入行)中获取dataFrame信息,以便将模式作为源代码放入生产代码Scala Spark作业?

注:最好的答案是一些编码方法,但一个解释让我可以跋涉、扑通、辛劳、涉水、犁和艰难地完成编码也很有帮助。:)


共有2个答案

穆鸿飞
2023-03-14

那么,错误消息应该告诉您这里需要知道的一切-StructType需要一系列字段作为参数。因此,在您的案例中,模式应该如下所示:

StructType(Seq(
  StructField("comments", ArrayType(StructType(Seq(       // <- Seq[StructField]
    StructField("comId", StringType, true),
    StructField("content", StringType, true))), true), true), 
  StructField("createHour", StringType, true),
  StructField("gid", StringType, true),
  StructField("replies", ArrayType(StructType(Seq(        // <- Seq[StructField]
    StructField("content", StringType, true),
    StructField("repId", StringType, true))), true), true),
  StructField("revisions", ArrayType(StructType(Seq(      // <- Seq[StructField]
    StructField("modDate", StringType, true),
    StructField("revId", StringType, true))),true), true)))

江超英
2023-03-14

我最近遇到了这个。我正在使用Spark 2.0.2,所以我不知道这个解决方案是否适用于早期版本。

import scala.util.Try
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.types.{DataType, StructType}

/** Produce a Schema string from a Dataset */
def serializeSchema(ds: Dataset[_]): String = ds.schema.json

/** Produce a StructType schema object from a JSON string */
def deserializeSchema(json: String): StructType = {
    Try(DataType.fromJson(json)).getOrElse(LegacyTypeStringParser.parse(json)) match {
        case t: StructType => t
        case _ => throw new RuntimeException(s"Failed parsing StructType: $json")
    }
}

请注意,我刚刚从Spark structType对象中的私有函数复制的“反序列化”函数。我不知道跨版本对它的支持程度如何。

 类似资料:
  • 我想使用Spark和Scala强制转换dataframe的模式以更改某些列的类型。 具体地说,我正在尝试使用AS[U]函数,其描述为:“返回一个新的数据集,其中每个记录都映射到指定的类型。用于映射列的方法取决于U的类型。” 原则上,这正是我想要的,但我不能使它起作用。 下面是一个取自https://github.com/apache/spark/blob/master/sql/core/src/t

  • 在Akka中有没有什么方法可以像在Erlang中一样用{packet,4}来实现包帧?数据包如下所示:

  • 我正在尝试使用Scala中的Spark SQL查询Cassandra数据。 并抛出错误: org.apache.spark.sql.AnalysisException:找不到表或视图:.;第1行第14位;'Project[*]+-'UnresolvedRelation. 谢谢你。

  • 我正在考虑将dataset1分解为每个“T”类型的多个记录,然后与DataSet2连接。但是你能给我一个更好的方法,如果数据集变大了,它不会影响性能吗?

  • 我正在尝试访问配置单元表,并从表/数据帧中提取和转换某些列,然后将这些新列放入新的数据帧中。我试着用这种方式- 它使用SBT构建时没有任何错误。但当我尝试运行它时,我收到以下错误- 我想了解是什么导致了这个错误,如果有任何其他的方法来完成我正在尝试做的事情。

  • 我是Spark的新手,我正在使用scala编程。我想从HDFS或S3中读取一个文件,并将其转换为Spark数据帧。Csv文件的第一行是模式。但是,如何创建具有未知列的模式的数据框架呢?我使用下面的代码为一个已知的模式创建数据框架。 }