Spark DataFrame Schema
root
|-- promotion-id: string (nullable = true)
|-- custom-attributes: struct (nullable = true)
| |-- custom-attribute: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- _VALUE: string (nullable = true)
| | | |-- _attribute-id: string (nullable = true)
| | | |-- value: array (nullable = true)
| | | | |-- element: string (containsNull = true)
Sample Input Data
+-------------------------------+-----------------------------------------------------------
|_promotion-id |custom-attribute
+-------------------------------+-----------------------------------------------------------
|10-off-selected-appliances-wk39|[[false, geDoNotConvert,], [false, geLoyaltyPromotion,]]
|grewards_wk38_100_prize_draw |[[,georgeClubAnswers,[Ed, Prof, Sam]]]
Sample output data
promotion_id geDoNotConvert geLoyaltyPromotion georgeClubAnswers
10-off-selected-appliances-wk39 false false null
grewards_wk38_100_prize_draw null null [Ed, Prof, Sam]
Sample Code
val df1 = df.selectExpr("*", "inline(`custom-attributes`.`custom-attribute`)")
df1.groupBy("`_promotion-id`").pivot("_attribute-id").agg(first(col("`_VALUE`")))
数据-我使用XML中的许多附加列获取此类数据,并使用com。databricks spark-xml\u 2.11库,用于将xml数据转换为数据帧。
要求-必须从数组(struct)类型或列custom\u属性转换数据。示例中的custom\u属性,如示例输出所示。My struct有三个字段,分别命名为“\u VALUE”、“属性\u id”、“值”。我需要将属性id转换为列名称,数据为-检查“\u VALUE”是否为非null,如果是,则从该列中选取数据。否则从“值”列中选择数据。请注意,这些列的数据类型可能不同。
此外,我知道需要属性id的列表。
方法1
正如我知道的属性ID一样,我是否可以迭代数组(struct)来识别具有匹配属性ID的结构,并从“\u value”/“value”列中选择值?
方法2
使用内联函数展平DF,并通过获取“\u VALUE”/“VALUE”来透视属性ID列
问题:
方法1-我们可以使用UDF实现它吗?任何例子都会有帮助。
方法2-如果我有多个数组(struct)类型的列怎么办?此外,在pivot和aggr步骤中,我需要对“\u VALUE”/“VALUE”列执行三元操作。我们如何实现它?任何例子都会有帮助
我将回答方法1
假设属性id
在中是唯一的
定义案例类别-可选
case class Attribute(attributeId:String,_Value: String,value: Seq[String]) // Just for readable purpose.
case class CustomAttributes(geDoNotConvert:String,geLoyaltyPromotion:String,georgeClubAnswers:Seq[String]) // Define required custom attributes
定义自定义项
def parseXml:UserDefinedFunction = udf((customAttribute: Seq[Row]) => {
val attributes = customAttribute.map{row =>
val _VALUE = row.getAs[String]("_VALUE") // extracting "_VALUE"
val _attribute_id = row.getAs[String]("_attribute-id") // extracting "_attribute-id"
val value = row.getAs[Seq[String]]("value") // extracting "value"
Attribute(_attribute_id,_VALUE,value) // Wrapping above all columns into case class "CustomAttribute"
} // Getting all attributes in to an Seq[CustomAttribute]
val geDoNotConvert = attributes.filter(p => p.attributeId == "geDoNotConvert").headOption.map(_._Value).getOrElse(null)
val geLoyaltyPromotion = attributes.filter(p => p.attributeId == "geLoyaltyPromotion").headOption.map(_._Value).getOrElse(null)
val georgeClubAnswers = attributes.filter(p => p.attributeId == "georgeClubAnswers").headOption.map(_.value).getOrElse(null)
CustomAttributes(geDoNotConvert,geLoyaltyPromotion,georgeClubAnswers) // Returning Case class
})
读取XML文件,使用UDF分析所需列
val df = spark.read.option("rowTag", "promotion").xml(xmlPath)
.select($"_promotion-id",$"custom-attributes.*")
.withColumn("customAttribute",parseXml($"custom-attribute"))
.select("_promotion-id","customAttribute.*")
正在打印架构-<代码>df。printSchema()
root
|-- _promotion-id: string (nullable = true)
|-- geDoNotConvert: string (nullable = true)
|-- geLoyaltyPromotion: string (nullable = true)
|-- georgeClubAnswers: array (nullable = true)
| |-- element: string (containsNull = true)
最终输出-<代码>df。显示(假)
+-------------------------------+--------------+------------------+----------------------------------------+
|_promotion-id |geDoNotConvert|geLoyaltyPromotion|georgeClubAnswers |
+-------------------------------+--------------+------------------+----------------------------------------+
|grewards_wk38_100_prize_draw |false |false |[Ed Sheeran, Professor Green, Sam Smith]|
|10-off-selected-appliances-wk39|false |false |null |
+-------------------------------+--------------+------------------+----------------------------------------+
执行时间,用于两个记录
方法1-所用时间:<代码>4698 ms
方法2-所用时间:8529 ms
当我试图用Intellij编译我的项目时,sbt正在抱怨未解决的依赖项 [Warn]===public:已尝试[Warn]https://repol.maven.org/maven2/org/apache/spark/spark-core/2.1.1/spark-core-2.1.1.pom[Warn]未解析的依赖关系路径:org.apache.spark:spark-core:2.1.1 我的s
关于JAX-WS Web服务,我现在面临一个恼人的错误几天了。我打算用使用自定义类型(基本上是“JAX-WS原语”类型的结构,如int、long和string)作为参数和返回值的方法生成一个网络服务。 这是我在尝试发布Web服务时遇到的例外: 异常线程"main"javax.xml.ws.WebServiceExc0019:类org.econet.ecomanager.msgexchange.we
我在aws s3和emr上使用Spark 2.4进行项目,我有一个左连接,有两个巨大的数据部分。火花执行不稳定,它经常因内存问题而失败。 集群有10台m3.2xlarge类型的机器,每台机器有16个vCore、30 GiB内存、160个SSD GB存储。 我有这样的配置: 左侧连接发生在 150GB 的左侧和大约 30GB 的右侧之间,因此有很多随机播放。我的解决方案是将右侧切得足够小,例如 1G
我有以下格式的json文件: 那么,我如何在pig中解析这个json。。 此外,categories和rep中可以有一些char。。可能并不总是空的。我做了以下尝试。 但我得到这个错误: 组织。科德豪斯。杰克逊。JsonParseException:意外字符('D'(代码68)):在[源代码:java.io]处应为有效值(数字、字符串、数组、对象、“true”、“false”或“null”)。By
我在解析从Excel中的API检索的XML文件时遇到问题。我可以成功地检索数据集(如下所示),但我找到的将每个字段格式化为其自身单元格的表格式的解决方案没有奏效,我认为这是因为XML的格式化方式。 每个XML看起来都类似于下面的内容。可能需要独立提取多个消息ID。(这是我遇到的另一个问题。“消息ID”中的空格引发了各种错误。) 下面是我尝试使用的解决方案:如何使用vba解析XML 下面是我尝试拉取
我在火花数据帧中有一个“结构类型”列,它有一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用UDF处理它吗?或者有什么替代方案? 似乎我需要行类型的UDF,类似 这是有意义的,因为Spark不知道返回类型的模式。不幸的是,udf.register也失败了: