当前位置: 首页 > 知识库问答 >
问题:

如何写以数组[StructType],StructType为输入,返回数组[StructType]的Spark UDF

洪国兴
2023-03-14

我有一个具有以下模式的数据帧:

root
 |-- user_id: string (nullable = true)
 |-- user_loans_arr: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- loan_date: string (nullable = true)
 |    |    |-- loan_amount: string (nullable = true)
 |-- new_loan: struct (nullable = true)
 |    |-- loan_date : string (nullable = true)
 |    |-- loan_amount : string (nullable = true)

我想使用一个UDF,它将user_loans_arr和new_loan作为输入,并将new_loan结构添加到现有的user_loans_arr中。然后,从user_loans_arr中删除loan_date超过12个月的所有元素。

提前谢谢。

共有2个答案

晏晨朗
2023-03-14

您需要将数组和结构列作为数组或结构传递给udf。我更喜欢将它作为struct传递。在那里,您可以操作元素并返回数组类型。

import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import numpy as np
#Test data
tst = sqlContext.createDataFrame([(1,2,3,4),(3,4,5,1),(5,6,7,8),(7,8,9,2)],schema=['col1','col2','col3','col4'])
tst_1=(tst.withColumn("arr",F.array('col1','col2'))).withColumn("str",F.struct('col3','col4'))
# udf to return array
@udf(ArrayType(StringType()))
def fn(row):
    if(row.arr[1]>row.str.col4):
        res=[]
    else:
        res.append(row.str[i])        
        res = row.arr+row.str.asDict().values()        
    return(res)
# calling udf with a struct of array and struct column
tst_fin = tst_1.withColumn("res",fn(F.struct('arr','str')))

结果是

tst_fin.show()
+----+----+----+----+------+------+------------+
|col1|col2|col3|col4|   arr|   str|         res|
+----+----+----+----+------+------+------------+
|   1|   2|   3|   4|[1, 2]|[3, 4]|[1, 2, 4, 3]|
|   3|   4|   5|   1|[3, 4]|[5, 1]|          []|
|   5|   6|   7|   8|[5, 6]|[7, 8]|[5, 6, 8, 7]|
|   7|   8|   9|   2|[7, 8]|[9, 2]|          []|
+----+----+----+----+------+------+----------

这个例子把所有东西都当作int。因为你有字符串作为日期,在你的udf中,你必须使用python的datetime函数来进行比较。

钱志强
2023-03-14

如果火花

 val df = spark.sql(
      """
        |select user_id, user_loans_arr, new_loan
        |from values
        | ('u1', array(named_struct('loan_date', '2019-01-01', 'loan_amount', 100)), named_struct('loan_date',
        | '2020-01-01', 'loan_amount', 100)),
        | ('u2', array(named_struct('loan_date', '2020-01-01', 'loan_amount', 200)), named_struct('loan_date',
        | '2020-01-01', 'loan_amount', 100))
        | T(user_id, user_loans_arr, new_loan)
      """.stripMargin)
    df.show(false)
    df.printSchema()

    /**
      * +-------+-------------------+-----------------+
      * |user_id|user_loans_arr     |new_loan         |
      * +-------+-------------------+-----------------+
      * |u1     |[[2019-01-01, 100]]|[2020-01-01, 100]|
      * |u2     |[[2020-01-01, 200]]|[2020-01-01, 100]|
      * +-------+-------------------+-----------------+
      *
      * root
      * |-- user_id: string (nullable = false)
      * |-- user_loans_arr: array (nullable = false)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- loan_date: string (nullable = false)
      * |    |    |-- loan_amount: integer (nullable = false)
      * |-- new_loan: struct (nullable = false)
      * |    |-- loan_date: string (nullable = false)
      * |    |-- loan_amount: integer (nullable = false)
      */

user_loans_arr和new_loan作为输入,并将new_loan结构添加到现有user_loans_arr。然后,从user_loans_arr删除loan_date超过12个月的所有元素。

<代码>火花

    df.withColumn("user_loans_arr",
      expr(
        """
          |FILTER(array_union(user_loans_arr, array(new_loan)),
          | x -> months_between(current_date(), to_date(x.loan_date)) < 12)
        """.stripMargin))
      .show(false)

    /**
      * +-------+--------------------------------------+-----------------+
      * |user_id|user_loans_arr                        |new_loan         |
      * +-------+--------------------------------------+-----------------+
      * |u1     |[[2020-01-01, 100]]                   |[2020-01-01, 100]|
      * |u2     |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
      * +-------+--------------------------------------+-----------------+
      */

<代码>火花

 // spark < 2.4
    val outputSchema = df.schema("user_loans_arr").dataType

    import java.time._
    val add_and_filter = udf((userLoansArr: mutable.WrappedArray[Row], loan: Row) => {
      (userLoansArr :+ loan).filter(row => {
        val loanDate = LocalDate.parse(row.getAs[String]("loan_date"))
        val period = Period.between(loanDate, LocalDate.now())
        period.getYears * 12 + period.getMonths < 12
      })
    }, outputSchema)

    df.withColumn("user_loans_arr", add_and_filter($"user_loans_arr", $"new_loan"))
      .show(false)

    /**
      * +-------+--------------------------------------+-----------------+
      * |user_id|user_loans_arr                        |new_loan         |
      * +-------+--------------------------------------+-----------------+
      * |u1     |[[2020-01-01, 100]]                   |[2020-01-01, 100]|
      * |u2     |[[2020-01-01, 200], [2020-01-01, 100]]|[2020-01-01, 100]|
      * +-------+--------------------------------------+-----------------+
      */

 类似资料:
  • 我知道,Case类是最小的正则类,而StructType是一种spark数据类型,它是StructFields的集合。 但是我们可以使用Case类和StructType以类似的方式创建数据帧和其他用例。 想要理解 在什么情况下,我们应该选择其中一种,为什么

  • 我有一个以XML形式出现的数据集,其中一个节点包含JSON。Spark将其作为StringType读入,因此我尝试使用from_json()将json转换为数据帧。 我可以将字符串转换为JSON,但如何编写模式来处理数组? 没有数组的字符串-工作得很好 带数组的字符串 - 无法弄清楚这个

  • 这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式。 我正在使用一个Spark数据框架,它可以从几个不同的模式版本之一加载数据: 我正在使用Spark Avro加载数据。 它可能是版本一文件或版本二文件。但是我希望能够以相同的方式处理它,将未知值设置为“null”。我之前的问题中的建议是设置模式,但是我不想重复自己在文件中编写模式,也不想重复自己在和朋友中编写模式。如何将avro

  • 我是新的spark和python,面临着从元数据文件构建模式的困难,该模式可以应用于我的数据文件。场景:数据文件的元数据文件(csv格式),包含列及其类型:例如: 我已成功将其转换为如下数据帧: 但是当我尝试用这个将其转换为StructField格式时 或 然后使用 我得到以下错误: 一旦我准备好了模式,我想使用createDataFrame来应用于我的数据文件。这个过程必须为许多表完成,所以我不

  • 我有一个类似这样的JSON: 我正在尝试将此结构映射到 Spark 架构。我已经创建了以下内容;但是它不起作用。我还尝试在值字段映射中移除。 另外,请注意,它们“key1”和“key2”是动态字段,将使用唯一标识符生成。也可以有两个以上的键。有没有人能够将数组类型映射到结构类型?

  • 我有如下所示的示例数据,我需要使用spark scala代码将列(ABS,ALT)从字符串转换为数组[structType]。任何帮助都将不胜感激。 在UDF的帮助下,我能够从字符串转换为arrayType,但需要一些帮助来将这两列(ABS、ALT)的字符串转换为数组[structType]。 df。预期架构: