当前位置: 首页 > 知识库问答 >
问题:

在火花scala中使用结构创建模式

柯良骏
2023-03-14

我是scala新手,尝试从元素数组中创建自定义模式,以读取基于新自定义模式的文件。

我正在从json文件中读取数组,并使用爆炸方法为列数组中的每个元素创建了一个数据框。

val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()

获得的输出为:

column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
 |-- column_id: string (nullable = true)
 |-- data_sensitivty: string (nullable = true)
 |-- datatype: string (nullable = true)
 |-- length: string (nullable = true)
 |-- name: string (nullable = true)

val column_values = ddb_schema.withColumn("columns", explode($"columns")).select("columns.*")
val column_name = column_values.select("name", "datatype", "length")

column_name.show(4)


 +------------------+--------+------+
 |              name|datatype|length|
 +------------------+--------+------+
 |     object_number| varchar|   100|
 |     function_type| varchar|   100|
 |             hof_1| decimal|  17,3|
 |             hof_2| decimal|  17,2|
 |            region| varchar|   100|
 |           country| varchar|  null|
 +------------------+--------+------+

现在,对于上面列出的所有值,我尝试使用下面的代码动态创建val模式

val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
  (schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
  )

def getFieldType(typeName: String): DataType = typeName match {
    case "varchar" => StringType
    // TODO include other types here
    case _ => StringType
  }

上面的问题是,我能够在struct中获取数据类型,但我也希望仅为数据类型decimal获取(scale和preicion),其限制条件为max allowable,条件是如果decimal的长度if为null或不存在,我们需要取默认值为(10,0),如果存在的值大于38,我们需要取默认值为(38,0)

共有2个答案

萧阳波
2023-03-14

可以按此处指定的方式创建具有精度的十进制数据类型:

 DataTypes.createDecimalType()

在函数“getFieldType”中,可以添加十进制类型的情况,即smth。例如:

case "decimal" => DataTypes.createDecimalType(10,0)
云欣嘉
2023-03-14

这种方法效果很好。

我向您展示了一个完整的示例,该示例完成了代码和预期结果。

您可以在val data中引入更多变体。

  /**
    * to obtain a tuple with precision and scale
    * @param precision Option[String]
    * @return (Int, Int)
    */
  def getDecimalScale(precision: Option[String]): (Int, Int) = {
    precision match {
      case Some(pr) => {
        pr.split(",").toList match {
          case List(h, _) if h.toInt >= 38 => (38,0)
          case List(h, t) => (h.toInt,t.head.toString.toInt)
          case _ => (10, 0)
        }
      }
      case None => (10, 0)
    }
  }
    val data = List(("object_number", "varchar", "100"), ("function_type", "varchar", "100"),
      ("hof_1", "decimal", "17,3"), ("hof_2", "decimal", "17,2"),
      ("hof_3", "decimal", null),("hof_4", "decimal", "39,2"),
      ("region", "varchar", "100"), ("country", "varchar", null))

    import spark.implicits._

    val column_name = sc.parallelize(data).toDF("name","datatype","length")

    column_name.show()
/*
+-------------+--------+------+
|         name|datatype|length|
+-------------+--------+------+
|object_number| varchar|   100|
|function_type| varchar|   100|
|        hof_1| decimal|  17,3|
|        hof_2| decimal|  17,2|
|        hof_3| decimal|  null|
|        hof_4| decimal|  39,2|
|       region| varchar|   100|
|      country| varchar|  null|
+-------------+--------+------+
*/

    val schemaColumns = column_name.collect()
    schemaColumns.foreach(println)
/*
[object_number,varchar,100]
[function_type,varchar,100]
[hof_1,decimal,17,3]
[hof_2,decimal,17,2]
[hof_3,decimal,null]
[hof_4,decimal,39,2]
[region,varchar,100]
[country,varchar,null]
*/

    val schema = schemaColumns.foldLeft(new StructType())(
      (schema, columnRow) => {
        columnRow.getAs[String]("datatype") match {
          case "varchar" => schema.add(columnRow.getAs[String]("name"), StringType, true)
          case "decimal" => {
            val (pr, sc) = getDecimalScale(Option(columnRow.getAs[String]("length")))
            schema.add(columnRow.getAs[String]("name"), new DecimalType(precision = pr, scale = sc), true)
          }
          case _ => schema.add(columnRow.getAs[String]("name"), StringType, true)
        }
      }
    )

    schema.printTreeString()
/*
root
 |-- object_number: string (nullable = true)
 |-- function_type: string (nullable = true)
 |-- hof_1: decimal(17,3) (nullable = true)
 |-- hof_2: decimal(17,2) (nullable = true)
 |-- hof_3: decimal(10,0) (nullable = true)
 |-- hof_4: decimal(38,0) (nullable = true)
 |-- region: string (nullable = true)
 |-- country: string (nullable = true)
*/
 类似资料:
  • 我正在运行以下scala代码: 我知道firstStruct是structType,StructFields的一个名称是“name”,但在尝试强制转换时似乎失败了。我被告知spark/hive结构与scala不同,但为了使用structType,我需要 所以我想他们应该是同一种类型的。 我看了看这里:https://github.com/apache/spark/blob/master/sql/c

  • 嗨,我正在尝试生成Salt示例的输出,但没有使用文档中提到的docker。我找到了帮助生成输出的scala代码,这是main.scala。我将main.scala修改为一个方便的main.scala, 我为这个scala创建了一个单独的文件夹, calac-cp“lib/salt.jar:lib/spark.jar”main.scala 这已成功运行并在文件夹BinexTest下生成类。 现在,项

  • 如何在火花scala数据帧(非文本)api中访问geomesas UDF?即如何转换 如何使sql UDF在scala数据帧DSL中的文本spark sql API中可用?即如何启用而不是此表达式 类似于 如何注册Geomesa UDF,使其不仅适用于sql文本模式<代码>SQLTypes。init(spark.sqlContext)fromhttps://github.com/locationt

  • 想象一下下面的代码: 如何定义myUdf的返回类型,以便查看代码的人立即知道它返回了一个Double?

  • 我在火花数据帧中有一个“结构类型”列,它有一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用UDF处理它吗?或者有什么替代方案? 似乎我需要行类型的UDF,类似 这是有意义的,因为Spark不知道返回类型的模式。不幸的是,udf.register也失败了:

  • 我正在寻找(一些测试没有成功)在不使用UDF的情况下从case语句返回元组结构类型,有什么方法可以做到这一点吗? 用例是:我们有两列依赖于case表达式中的相同条件,因此我们看到两个选项: 写入相同条件两次,但返回不同列(不需要) 写一次条件,但每次都返回2个值,这可以通过一个元组,然后我们将其拆分 我知道这可以使用UDF来完成,但是我们避免了UDF,因为被火花视为黑盒,因此它们是不可优化的,所以