当前位置: 首页 > 知识库问答 >
问题:

UDF中Scala中Spark数据帧的ListType、MapType、structType字段的一般处理?

萧德庸
2023-03-14

如何在Scala中对Spark StructType执行常规处理,如按名称选择字段、在映射/列表字段上迭代等?

在spark dataframe中,我有类型为“ArrayType”的列“instances”,具有以下模式:

instances[ArrayType]:
    0 [ StructType:
            name [StringType]
            address[StringType]
            experiences[MapType]:
                Company-1[StringType]:
                    StructType:
                        numYears[IntType]: 5
                        grade[IntType]
                Company-2[StringType]:
                    StructType:
                        numYears[IntType]:  12
                        grade[IntType]]
     1 [ StructType:
            name [StringType]
            address[StringType]
            experiences[MapType]:
                Company-1[StringType]:
                    StructType:
                        numYears[IntType]: 3
                        grade[IntType]
                Company-2[StringType]:
                    StructType:
                        numYears[IntType]:  9
                        grade[IntType]]

我需要将ArrayType列“instances”转换为类型为的列“totalExperience”

derived column "totalExperience" of type "MapType"[StringType -> IntType]
company-1: 8
company-2: 21

注:(5 3=8和12 9=21)

等效psuedo代码

totalExperience = Map<String, Int>();
for (instance in instances) {
    for ((currentExperience, numYears) in instance.getExperiences().entries()) {
         if (!totalExperience.contains(currentExperience)) {
              totalExperience.put(currentExperience, 0);
         }

         totalExperience.put(currentExperience, totalExperience.get(currentExperience) + numYears);
    }
}

return totalExperience

我为此编写了UDF,如下所示,但我没有找到在Scala spark中实现上述伪代码的任何方法:

  private val computeTotalExperience = udf(_ => MapType = (instances: ArrayType) => {
    val totalExperienceByCompany = DataTypes.createMapType(StringType, LongType)

    **How to iterate over "instances" with type as "ArrayType" ?**
    for (instance <- instances) {
      **How to access and iterate over "experiences" mapType field on instance ???**
      // Populate totalExperienceByCompany(MapType) with key as "company-1" name

    }

    delayReasons
  })

如何在自定义项的Scala中对Spark dataframe的ListType、MapType、StructType字段执行上述一般处理?

共有1个答案

郑博
2023-03-14

检查以下代码。

scala> df.printSchema
root
 |-- instances: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- experiences: map (nullable = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: struct (valueContainsNull = true)
 |    |    |    |    |-- numYears: integer (nullable = true)
 |    |    |    |    |-- grade: string (nullable = true)
 |    |    |-- name: string (nullable = true)
scala> df.show(false)
+-----------------------------------------------------------------------------------------------------------------------------------+
|instances                                                                                                                          |
+-----------------------------------------------------------------------------------------------------------------------------------+
|[[address_0, [Company-1 -> [5, 1], Company-2 -> [12, 1]], name_0], [address_1, [Company-1 -> [3, 1], Company-2 -> [9, 1]], name_1]]|
+-----------------------------------------------------------------------------------------------------------------------------------+                                                                                                                                                                                                   
scala> 
val expr = array(
    struct(lit("company-1").as("company"),$"instance.experiences.Company-1.numYears"),
    struct(lit("company-2").as("company"),$"instance.experiences.Company-2.numYears")
)
       
scala>  

df
.withColumn("instance",explode($"instances"))
.withColumn("company",explode(expr))
.select("company.*")
.groupBy($"company")
.agg(sum($"numYears").as("numYears"))
.select(map($"company",$"numYears").as("totalExperience"))
.show(false) 
                                                                                                                                                       
+-----------------+                                                                                                                                                                                
|totalExperience  |                                                                                                                                                                                
+-----------------+                                                                                                                                                                                
|[company-1 -> 8] |                                                                                                                                                                                
|[company-2 -> 21]|                                                                                                                                                                                
+-----------------+                                                                                                                                                                                
                     
 类似资料:
  • 我有一个具有以下模式的数据帧: 我想使用一个UDF,它将user_loans_arr和new_loan作为输入,并将new_loan结构添加到现有的user_loans_arr中。然后,从user_loans_arr中删除loan_date超过12个月的所有元素。 提前谢谢。

  • 我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (

  • 我和Spark一起在Databricks上工作。编程语言是Scala。 我有两个数据帧: 主数据框:见截图:1 查找数据帧:参见屏幕截图3 我想: 查找主数据框中“年龄”=-1的所有行 我对如何做这件事伤了脑筋。我唯一想到的是将dataframe存储为DataRicks中的表,并使用SQL语句(SQL.Context.SQL…),结果非常复杂。 我想知道是否有更有效的方法。 编辑:添加可复制的示例

  • 我正在尝试创建一个传递给from_jsonAPI的structType模式,以便解析存储为JSON字符串的列。JSON数据包含一个Map,其中包含String键和struct类型的值,但每个struct的模式取决于键。 考虑这个JSON示例,其中“数据”列是一个具有值和的Map,并且每个值的架构都不同: 对于键“名称”,结构值有一个成员字段“first”。对于键“地址”,结构值有两个成员字段“St

  • 我想使用Spark和Scala强制转换dataframe的模式以更改某些列的类型。 具体地说,我正在尝试使用AS[U]函数,其描述为:“返回一个新的数据集,其中每个记录都映射到指定的类型。用于映射列的方法取决于U的类型。” 原则上,这正是我想要的,但我不能使它起作用。 下面是一个取自https://github.com/apache/spark/blob/master/sql/core/src/t

  • 我已经编写了以下代码,运行良好。但是我想连接UDF,这样代码可以压缩成几行。请建议我怎么做。下面是我编写的代码。