当前位置: 首页 > 知识库问答 >
问题:

使用UDF在Spark DataFrame中创建新列

郜谦
2023-03-14
  val myUdf = udf((col_abc: String, col_xyz: String) => {
    array(
      struct(
        lit("x").alias("col1"),
        col(col_abc).alias("col2"),
        col(col_xyz).alias("col3")
      )
    )
  }

现在,我想在一个函数中使用这个,如下所示-

def myfunc(): Column = {
    val myvariable = myUdf($"col_abc", $"col_xyz")
    myvariable
}

然后使用此函数在我的DataFrame中创建一个新列

val df = df..withColumn("new_col", myfunc())

总之,我希望我的列“new_col”是一个类型数组,其值为[[x,x,x]]

我得到以下错误。我在这里做错了什么?

原因:java.lang.UnsupportedOperationException:不支持org.apache.spark.sql.Column类型的模式

共有1个答案

卞安邦
2023-03-14

两条路。

  1. 不要使用UDF,因为您使用的是纯火花函数:
val myUdf = ((col_abc: String, col_xyz: String) => {
    array(
      struct(
        lit("x").alias("col1"),
        col(col_abc).alias("col2"),
        col(col_xyz).alias("col3")
      )
    )
  }
)

def myfunc(): Column = {
    val myvariable = myUdf("col_abc", "col_xyz")
    myvariable
}

df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz|        new_col|
+-------+-------+---------------+
|    abc|    xyz|[[x, abc, xyz]]|
+-------+-------+---------------+
case class cols (col1: String, col2: String, col3: String)

val myUdf = udf((col_abc: String, col_xyz: String) => Seq(cols("x", col_abc, col_xyz)))

def myfunc(): Column = {
    val myvariable = myUdf($"col_abc", $"col_xyz")
    myvariable
}

df.withColumn("new_col", myfunc()).show
+-------+-------+---------------+
|col_abc|col_xyz|        new_col|
+-------+-------+---------------+
|    abc|    xyz|[[x, abc, xyz]]|
+-------+-------+---------------+

如果要向函数传递列,这里有一个示例:

val myUdf = ((col_abc: Column, col_xyz: Column) => {
    array(
      struct(
        lit("x").alias("col1"),
        col_abc.alias("col2"),
        col_xyz.alias("col3")
      )
    )
  }
)
 类似资料:
  • 如何将一个新的列事件添加到数据帧中,该事件将是< code>generate_header的结果?如何添加一行作为列值? 可能我们需要将函数转换为UDF 假设我们有这样的东西 我们想得到这个

  • 我在 RabbitMQ 中创建新队列时遇到了一些问题。我只创建一个使用者客户端,该客户端将使用来自另一个微服务的消息。 这是我到目前为止所做的。 应用程序属性: 配置类: 和侦听器类: 当我运行这个程序时,我有一条ACCESS_REFUSED消息,但我不知道为什么。我错过了什么吗?? 谢谢

  • 问题内容: 问题 我想在Java中创建一个用户定义函数,可以将其称为Apache Spark运算符链中的Java方法。我在查找不需要UDF存在于SQL查询中的Java示例时遇到了麻烦。 版本号 Java 8 斯卡拉2.10.6 为Hadoop 2.6.0预先构建的Apache Spark 1.6.0 我尝试过的方法 我可以用Java成功创建UDF。但是,除非在SQL查询中,否则无法使用它: 我被困

  • WordPress有一个管理仪表板。在仪表板中,我们可以作为管理员添加新用户。我想在管理员添加新用户时在MySQL中创建一个表。例如,我创建了一个名为John Smith的用户,其用户名为user1;当我成功添加该用户时,将在名为user1的数据库中创建一个表。

  • 您的回应将不胜感激。谢了!

  • 添加/home/cloudera/date.jar到类路径添加资源:/home/cloudera/date.jar 请有人帮帮我,因为我是新来蜂巢的。有人能告诉我要遵循的步骤吗