我是scala新手,尝试从元素数组中创建自定义模式,以读取基于新自定义模式的文件。
我正在从json文件中读取数组,并使用爆炸方法为列数组中的每个元素创建了一个数据框。
val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()
获得的输出为:
column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
|-- column_id: string (nullable = true)
|-- data_sensitivty: string (nullable = true)
|-- datatype: string (nullable = true)
|-- length: string (nullable = true)
|-- name: string (nullable = true)
val column_values = ddb_schema.withColumn("columns", explode($"columns")).select("columns.*")
val column_name = column_values.select("name", "datatype", "length")
column_name.show(4)
+------------------+--------+------+
| name|datatype|length|
+------------------+--------+------+
| object_number| varchar| 100|
| function_type| varchar| 100|
| hof_1| decimal| 17,3|
| hof_2| decimal| 17,2|
| region| varchar| 100|
| country| varchar| null|
+------------------+--------+------+
现在,对于上面列出的所有值,我尝试使用下面的代码动态创建val模式
val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
)
def getFieldType(typeName: String): DataType = typeName match {
case "varchar" => StringType
// TODO include other types here
case _ => StringType
}
上面的问题是,我能够在struct中获取数据类型,但我也希望仅为数据类型decimal获取(scale和preicion),其限制条件为max allowable,条件是如果decimal的长度if为null或不存在,我们需要取默认值为(10,0),如果存在的值大于38,我们需要取默认值为(38,0)
可以按此处指定的方式创建具有精度的十进制数据类型:
DataTypes.createDecimalType()
在函数“getFieldType”中,可以添加十进制类型的情况,即smth。例如:
case "decimal" => DataTypes.createDecimalType(10,0)
这种方法效果很好。
我向您展示了一个完整的示例,该示例完成了代码和预期结果。
您可以在val data
中引入更多变体。
/**
* to obtain a tuple with precision and scale
* @param precision Option[String]
* @return (Int, Int)
*/
def getDecimalScale(precision: Option[String]): (Int, Int) = {
precision match {
case Some(pr) => {
pr.split(",").toList match {
case List(h, _) if h.toInt >= 38 => (38,0)
case List(h, t) => (h.toInt,t.head.toString.toInt)
case _ => (10, 0)
}
}
case None => (10, 0)
}
}
val data = List(("object_number", "varchar", "100"), ("function_type", "varchar", "100"),
("hof_1", "decimal", "17,3"), ("hof_2", "decimal", "17,2"),
("hof_3", "decimal", null),("hof_4", "decimal", "39,2"),
("region", "varchar", "100"), ("country", "varchar", null))
import spark.implicits._
val column_name = sc.parallelize(data).toDF("name","datatype","length")
column_name.show()
/*
+-------------+--------+------+
| name|datatype|length|
+-------------+--------+------+
|object_number| varchar| 100|
|function_type| varchar| 100|
| hof_1| decimal| 17,3|
| hof_2| decimal| 17,2|
| hof_3| decimal| null|
| hof_4| decimal| 39,2|
| region| varchar| 100|
| country| varchar| null|
+-------------+--------+------+
*/
val schemaColumns = column_name.collect()
schemaColumns.foreach(println)
/*
[object_number,varchar,100]
[function_type,varchar,100]
[hof_1,decimal,17,3]
[hof_2,decimal,17,2]
[hof_3,decimal,null]
[hof_4,decimal,39,2]
[region,varchar,100]
[country,varchar,null]
*/
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => {
columnRow.getAs[String]("datatype") match {
case "varchar" => schema.add(columnRow.getAs[String]("name"), StringType, true)
case "decimal" => {
val (pr, sc) = getDecimalScale(Option(columnRow.getAs[String]("length")))
schema.add(columnRow.getAs[String]("name"), new DecimalType(precision = pr, scale = sc), true)
}
case _ => schema.add(columnRow.getAs[String]("name"), StringType, true)
}
}
)
schema.printTreeString()
/*
root
|-- object_number: string (nullable = true)
|-- function_type: string (nullable = true)
|-- hof_1: decimal(17,3) (nullable = true)
|-- hof_2: decimal(17,2) (nullable = true)
|-- hof_3: decimal(10,0) (nullable = true)
|-- hof_4: decimal(38,0) (nullable = true)
|-- region: string (nullable = true)
|-- country: string (nullable = true)
*/
我正在运行以下scala代码: 我知道firstStruct是structType,StructFields的一个名称是“name”,但在尝试强制转换时似乎失败了。我被告知spark/hive结构与scala不同,但为了使用structType,我需要 所以我想他们应该是同一种类型的。 我看了看这里:https://github.com/apache/spark/blob/master/sql/c
嗨,我正在尝试生成Salt示例的输出,但没有使用文档中提到的docker。我找到了帮助生成输出的scala代码,这是main.scala。我将main.scala修改为一个方便的main.scala, 我为这个scala创建了一个单独的文件夹, calac-cp“lib/salt.jar:lib/spark.jar”main.scala 这已成功运行并在文件夹BinexTest下生成类。 现在,项
如何在火花scala数据帧(非文本)api中访问geomesas UDF?即如何转换 如何使sql UDF在scala数据帧DSL中的文本spark sql API中可用?即如何启用而不是此表达式 类似于 如何注册Geomesa UDF,使其不仅适用于sql文本模式<代码>SQLTypes。init(spark.sqlContext)fromhttps://github.com/locationt
想象一下下面的代码: 如何定义myUdf的返回类型,以便查看代码的人立即知道它返回了一个Double?
我在火花数据帧中有一个“结构类型”列,它有一个数组和一个字符串作为子字段。我想修改数组并返回相同类型的新列。我可以用UDF处理它吗?或者有什么替代方案? 似乎我需要行类型的UDF,类似 这是有意义的,因为Spark不知道返回类型的模式。不幸的是,udf.register也失败了:
我正在寻找(一些测试没有成功)在不使用UDF的情况下从case语句返回元组结构类型,有什么方法可以做到这一点吗? 用例是:我们有两列依赖于case表达式中的相同条件,因此我们看到两个选项: 写入相同条件两次,但返回不同列(不需要) 写一次条件,但每次都返回2个值,这可以通过一个元组,然后我们将其拆分 我知道这可以使用UDF来完成,但是我们避免了UDF,因为被火花视为黑盒,因此它们是不可优化的,所以