我正在使用一个Spark数据框,该数据框可能正在从三个不同架构版本之一加载数据:
// Original
{ "A": {"B": 1 } }
// Addition "C"
{ "A": {"B": 1 }, "C": 2 }
// Additional "A.D"
{ "A": {"B": 1, "D": 3 }, "C": 2 }
我可以通过检查架构是否包含字段“ C”以及是否不向数据框添加新列来处理其他“ C”。但是我不知道如何为子对象创建一个字段。
public void evolvingSchema() {
String versionOne = "{ \"A\": {\"B\": 1 } }";
String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";
process(spark.getContext(), "1", versionOne);
process(spark.getContext(), "2", versionTwo);
process(spark.getContext(), "2", versionThree);
}
private static void process(JavaSparkContext sc, String version, String data) {
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json(sc.parallelize(Arrays.asList(data)));
if(!Arrays.asList(df.schema().fieldNames()).contains("C")) {
df = df.withColumn("C", org.apache.spark.sql.functions.lit(null));
}
// Not sure what to put here. The fieldNames does not contain the "A.D"
try {
df.select("C").collect();
} catch(Exception e) {
System.out.println("Failed to C for " + version);
}
try {
df.select("A.D").collect();
} catch(Exception e) {
System.out.println("Failed to A.D for " + version);
}
}
JSON源不是非常适合具有不断发展的模式的数据(而不是Avro或Parquet),但是简单的解决方案是对所有源使用相同的模式,并使新字段为可选/可为空:
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val schema = StructType(Seq(
StructField("A", StructType(Seq(
StructField("B", LongType, true),
StructField("D", LongType, true)
)), true),
StructField("C", LongType, true)))
您可以schema
像这样传递给DataFrameReader
:
val rddV1 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 } }"))
val df1 = sqlContext.read.schema(schema).json(rddV1)
val rddV2 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 }, \"C\": 2 }"))
val df2 = sqlContext.read.schema(schema).json(rddV2)
val rddV3 = sc.parallelize(Seq("{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }"))
val df3 = sqlContext.read.schema(schema).json(rddV3)
并且您将获得一个独立于变体的一致结构:
require(df1.schema == df2.schema && df2.schema == df3.schema)
缺少的列会自动设置为null
:
df1.printSchema
// root
// |-- A: struct (nullable = true)
// | |-- B: long (nullable = true)
// | |-- D: long (nullable = true)
// |-- C: long (nullable = true)
df1.show
// +--------+----+
// | A| C|
// +--------+----+
// |[1,null]|null|
// +--------+----+
df2.show
// +--------+---+
// | A| C|
// +--------+---+
// |[1,null]| 2|
// +--------+---+
df3.show
// +-----+---+
// | A| C|
// +-----+---+
// |[1,3]| 2|
// +-----+---+
注意事项 :
此解决方案取决于数据源。它可能与其他来源一起使用或可能不一起使用,甚至导致记录格式错误。
zero323已回答了问题,但在Scala中。这是同一件事,但是在Java中。
public void evolvingSchema() {
String versionOne = "{ \"A\": {\"B\": 1 } }";
String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";
process(spark.getContext(), "1", versionOne);
process(spark.getContext(), "2", versionTwo);
process(spark.getContext(), "2", versionThree);
}
private static void process(JavaSparkContext sc, String version, String data) {
StructType schema = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("A",
DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("B", DataTypes.LongType, true),
DataTypes.createStructField("D", DataTypes.LongType, true))), true),
DataTypes.createStructField("C", DataTypes.LongType, true)));
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().schema(schema).json(sc.parallelize(Arrays.asList(data)));
try {
df.select("C").collect();
} catch(Exception e) {
System.out.println("Failed to C for " + version);
}
try {
df.select("A.D").collect();
} catch(Exception e) {
System.out.println("Failed to A.D for " + version);
}
}
问题内容: 目前,我有两个几乎相同的架构: 和 它们的唯一区别在于验证:用户不需要名字,姓氏或电话。但是,管理员必须定义这些属性。 不幸的是,上面的代码不是很干,因为它们几乎相同。因此,我想知道是否有可能基于。例如: 显然,这只是伪代码。这样的事情可能吗? 另一个非常相似的问题是,是否有可能基于另一个创建新的架构,并为其添加更多属性。例如: 问题答案: 在其他地方,有人建议使用utils.inhe
保护浏览器不受扩展的缺陷影响 保护浏览器不受扩展的缺陷影响 Adam Barth, Adrienne Porter Felt, Prateek Saxena, and Aaron Boodman EECS Department. University of California, Berkeley. Technical Report No. UCB/EECS-2009-185 摘要 浏览器扩展非常
架构及扩展 一、单入口应用程序 基于SpeedPHP框架开发的应用程序,均是“单入口应用程序”(以下简称单入口程序),这是SpeedPHP框架的默认配置。 单入口程序概述 指在同一个应用程序中,访问者仅可以通过相同的一个文件来使用整个应用程序的功能。这个文件称作“入口文件”,而这种结构的应用程序整体就称作“单入口应用程序”。 在sp框架内,入口文件就是程序顶级目录的index.php文件。 一般而
第十三章介绍了如何开发一个Web框架,通过介绍MVC、路由、日志处理、配置处理完成了一个基本的框架系统,但是一个好的框架需要一些方便的辅助工具来快速的开发Web,那么我们这一章将就如何提供一些快速开发Web的工具进行介绍,第一小节介绍如何处理静态文件,如何利用现有的twitter开源的bootstrap进行快速的开发美观的站点,第二小节介绍如何利用前面介绍的session来进行用户登录处理,第三小
作者:陈希章 发表于 2018年4月7日 前言 此前我有一篇 文章 讲解了Microsoft Graph的一种数据扩展技术—— 开发扩展(Open Extensions),它可以实现在支持的对象(例如用户,组等)上面附加任意的数据。但开放扩展的问题在于,它是基于某个具体对象的,你无法确定两个对象是否具有同样的扩展(即便属性名一样,但也可能其包含的数据完全不同)。如果我们需要对一类对象进行统一的扩展
问题内容: 我有一个用Swift编写的应用程序(简称MyApp),其目标如下: :主要目标 :一个目标,用于为应用及其扩展程序(主要是API后端和数据库处理)之间共享的代码构建框架 :使用框架的Today View小部件(或现在称为的小部件)。 该框架链接到使用它的每个目标,即和。输入Cocoapods:我以前具有以下Podfile结构: 这里的目的是仅将框架公开给其他部分,而不是将其所有pod