当前位置: 首页 > 知识库问答 >
问题:

Spark scala基于规则从数组列派生列

申屠泉
2023-03-14

我是火花和scala新手。我有一个json数组结构作为输入,类似于下面的模式。

root
|-- entity: struct (nullable = true)
|    |-- email: string (nullable = true)
|    |-- primaryAddresses: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- postalCode: string (nullable = true)
|    |    |    |-- streetAddress: struct (nullable = true)
|    |    |    |    |-- line1: string (nullable = true)

我将数组结构展平到下面的示例数据帧

+-------------+--------------------------------------+--------------------------------------+
|entity.email |entity.primaryAddresses[0].postalCode |entity.primaryAddresses[1].postalCode |....
+-------------+--------------------------------------+--------------------------------------+
|a@b.com      |                                      |                                      |
|a@b.com      |                                      |12345                                 |
|a@b.com      |12345                                 |                                      |
|a@b.com      |0                                     |0                                     |
+-------------+--------------------------------------+--------------------------------------+

我的最终目标是为数据质量度量的每一列计算存在/不存在/零计数。但在计算数据质量指标之前,我正在寻找一种方法,为每个数组列元素派生一个新列,如下所示:

  • 如果特定数组元素的所有值都为空,则该元素的派生列为空
  • 如果数组元素至少存在一个值,则将元素存在视为1
  • 如果数组元素的所有值均为零,则我将该元素标记为零(稍后计算数据质量时,我将其校准为存在=1和零=1)

下面是一个示例中间数据帧,我试图通过为每个数组元素派生的列来实现它。原始数组元素被删除。

 
+-------------+--------------------------------------+
|entity.email |entity.primaryAddresses.postalCode    |.....
+-------------+--------------------------------------+
|a@b.com      |                                      |
|a@b.com      |1                                     |
|a@b.com      |1                                     |
|a@b.com      |0                                     |
+-------------+--------------------------------------+

输入json记录元素是动态的,可以更改。为了派生数组元素的列,我构建了一个scala映射,其中键作为列名,不带数组索引(例如:entity.primaryAddresses.postalCode),值作为数组元素的列表,以运行特定键的规则。我正在寻找一种方法来实现上述中间数据帧。

一个问题是,对于某些输入文件,在我展平数据帧后,数据帧列计数超过70k。由于记录数量预计将达到数百万,我想知道我是否应该分解每个元素以获得更好的性能,而不是将json扁平化。

欣赏任何想法。非常感谢。

共有2个答案

孙元明
2023-03-14

您可以利用可以帮助您执行数据质量指标的自定义用户定义函数。

val postalUdf = udf((postalCode0: Int, postalCode1: Int) => {
        //TODO implement you logic here
    })

然后使用is创建一个新的dataframe列

df
  .withColumn("postcalCode", postalUdf(col("postalCode_0"), col("postalCode_1")))
  .show()
钮出野
2023-03-14

创建助手函数

使用下面的函数提取列

scala> df.printSchema
root
 |-- entity: struct (nullable = false)
 |    |-- email: string (nullable = false)
 |    |-- primaryAddresses: array (nullable = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- postalCode: string (nullable = false)
 |    |    |    |-- streetAddress: struct (nullable = false)
 |    |    |    |    |-- line1: string (nullable = false)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}
scala> df.explodeColumns.printSchema
root
 |-- entity_email: string (nullable = false)
 |-- entity_primaryAddresses_postalCode: string (nullable = true)
 |-- entity_primaryAddresses_streetAddress_line1: string (nullable = true)

 类似资料:
  • 问题内容: JPA 2.0(Hibernate 4.2.4.Final/Spring 3.2.8.Release)/ Mysql 5.6 对于管理实体E w /自动生成的主键,例如 出于传统原因,foo需要等于:{id}:。例如,如果id为204,则foo将为“:204:”,因此在事务中发生这种情况是可行的 有没有一种更好的方法来计算值取决于生成的ID的派生列?没有上述技巧,即在持久化之后直接更新

  • 问题内容: 我正在使用 Solr-5.0.0 。我正在寻找一个领域。我需要添加一些规则以获得相关结果。 如果我搜索一个单词,如果存在完全匹配,则应排在最前面。例如:如果我搜索,它应该首先返回与 笔记本电脑 完全相同的内容。 如果我搜索多个单词,则应遵循规则1.单词长度最小的单词排在最前面。例如:如果我进行搜索,它应该比 Dell inspiron笔记本 电脑 先送回 Dell笔记本 电脑 。 如果

  • 问题内容: 我想对包含特定单词的所有锚应用不同的样式。可以在纯CSS中完成吗?如果仅CSS3,也可以。 问题答案: 编号曾经被提议过,但不在CSS3选择器的当前工作草案中。 您将需要一些JavaScript,例如:

  • 如何创建列colMap的ArrayType[StringType]哪个值是数组与元素是字符串匹配的列的名称哪些值为真? 我有这样的输入DataFrame: 我想创建这样的输出DataFrame: 编辑:我发现了这个重复的问题: Spark scala从多列中获取字符串类型的数组 但想知道是否有更好的方法来实现产出?

  • 我有一个DF,其中包含一个巨大的可解析元数据,作为Dataframe中的单个字符串列,我们称之为DFA,使用ColmnA。 我想通过一个函数,ClassXYZ = Func1(ColmnA)将ColmnA这一列分成多个列。这个函数返回一个具有多个变量的类ClassXYZ,现在每个变量都必须映射到新的列,比如ColmnA1、ColmnA2等。 我如何通过调用Func1一次来完成从一个数据帧到另一个数

  • 通过前面的学习我们知道 Java 实际上没有多维数组,只有一维数组。多维数组被解释为是数组的数组,所以因此会衍生出一种不规则数组。 规则的 4×3 二维数组有 12 个元素,而不规则数组就不一定了。如下代码静态初始化了一个不规则数组。 int intArray[][] = {{1,2}, {11}, {21,22,23}, {31,32,33}}; 高维数组(二维以及二维以上的数组称为高维数组)是