当前位置: 首页 > 知识库问答 >
问题:

在UDF中更改Spark结构和structfield名称

汪飞捷
2023-03-14

我正在尝试将火花中的结构传递给 udf。它正在更改字段名称并重命名为列位置。如何修复此问题?

object TestCSV {

          def main(args: Array[String]) {

            val conf = new SparkConf().setAppName("localTest").setMaster("local")
            val sc = new SparkContext(conf)
            val sqlContext = new SQLContext(sc)


            val inputData = sqlContext.read.format("com.databricks.spark.csv")
                  .option("delimiter","|")
                  .option("header", "true")
                  .load("test.csv")


            inputData.printSchema()

            inputData.show()

            val groupedData = inputData.withColumn("name",struct(inputData("firstname"),inputData("lastname")))

            val udfApply = groupedData.withColumn("newName",processName(groupedData("name")))

           udfApply.show()
          }



             def processName = udf((input:Row) =>{

                println(input)
                println(input.schema)

                Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname"))

              })

        }

输出:

 root
 |-- id: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- lastname: string (nullable = true)

 +---+---------+--------+
 | id|firstname|lastname|
 +---+---------+--------+
 |  1|     jack| reacher|
 |  2|     john|     Doe|
 +---+---------+--------+

错误:

[jack,reacher]struct type(struct field(I[1],StringType,true),

共有1个答案

冯旭
2023-03-14

你遇到的事情真的很奇怪。玩了一会儿后,我终于发现这可能与优化引擎的问题有关。似乎问题不在于UDF,而在于< code>struct函数。

我让它工作(火花1.6.3)当我缓存group Data,没有缓存我得到你报告的异常:

import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}


object Demo {

  def main(args: Array[String]): Unit = {

    val sc = new SparkContext(new SparkConf().setAppName("Demo").setMaster("local[1]"))
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._
    import org.apache.spark.sql.functions._


    def processName = udf((input: Row) => {
      Map("firstName" -> input.getAs[String]("firstname"), "lastName" -> input.getAs[String]("lastname"))
    })


    val inputData =
      sc.parallelize(
        Seq(("1", "Kevin", "Costner"))
      ).toDF("id", "firstname", "lastname")


    val groupedData = inputData.withColumn("name", struct(inputData("firstname"), inputData("lastname")))
      .cache() // does not work without cache

    val udfApply = groupedData.withColumn("newName", processName(groupedData("name")))
    udfApply.show()
  }
}

或者,您可以使用RDD API来制作您的结构,但这并不太好:

case class Name(firstname:String,lastname:String) // define outside main

val groupedData = inputData.rdd
    .map{r =>
        (r.getAs[String]("id"),
          Name(
            r.getAs[String]("firstname"),
            r.getAs[String]("lastname")
          )
        )
    }
   .toDF("id","name")
 类似资料:
  • 问题内容: 我有一个JSON数据结构,如下所示: 我需要将键名称从“名称”和“子代”更改为“键”和“值”。关于如何为该嵌套结构中的每个键名执行此操作的任何建议? 问题答案: 我不知道为什么JSON标记的末尾会有分号 (假设这就是问题中所代表的意思) ,但是如果删除了分号,则可以在分析数据时使用 reviver函数 进行修改。 演示: http : //jsfiddle.net/BeSad/

  • 我已经编写了以下代码,运行良好。但是我想连接UDF,这样代码可以压缩成几行。请建议我怎么做。下面是我编写的代码。

  • 问题内容: http://play.golang.org/p/vhaKi5uVmm [第一个问题] 我们如何以及为什么需要这种看起来很奇怪的结构?它是空结构还是匿名结构?我用谷歌搜索,但是找不到正确的答案或说明文档。 原始资料来自Andrew Gerrand的演讲 http://nf.wh3rd.net/10things/#10 这里 完成是struct {}类型的通道 所以我尝试了 但这是行不通

  • 我正在考虑创建XML文件的Java生成器,然后由另一个Java程序加载这些文件(我不能更改那里的代码)。很明显的答案是JAXB,但是我偶然发现了一个问题。 所有三个XML都将是生成器的有效输出。我已经为fistElement、secondElement等创建了Java类(有很多),但我想不出如何将它们全部添加到一个HeadTag元素下。 我有很多元素,超过500个,所以把它们作为一个字段放在课堂上

  • 命令用于在表中进行更改。参考以下操作步骤: 可以执行以下更改操作: 添加新列 添加主键 添加外键 添加约束 更改列的类型 删除列 删除主键 删除外键 删除约束 下面以为指定表添加一列为例: 假设在“my_table2”表中添加一个新列“department”,数据类型为:,长度为:,如下图所示: 执行成功,如下所示: 查看 的表结构,如下所示: 以同样的方式添加主键: 添加外键 添加约束 更改列的

  • 我有以下结构: