当前位置: 首页 > 面试题库 >

如何将新的Struct列添加到DataFrame

端木鹏
2023-03-14
问题内容

我目前正在尝试从MongoDB中提取数据库,并使用Spark来将其提取到ElasticSearch中geo_points

Mongo数据库具有纬度和经度值,但是ElasticSearch要求将它们强制转换为geo_point类型。

Spark中是否可以将latand lon列复制到arrayor 的新列struct

任何帮助表示赞赏!


问题答案:

我假设您从某种平面模式开始,如下所示:

root
 |-- lat: double (nullable = false)
 |-- long: double (nullable = false)
 |-- key: string (nullable = false)

首先让我们创建示例数据:

import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types._

val rdd = sc.parallelize(
    Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)

val schema = StructType(
    StructField("lat", DoubleType, false) ::
    StructField("long", DoubleType, false) ::
    StructField("key", StringType, false) ::Nil)

val df = sqlContext.createDataFrame(rdd, schema)

一种简单的方法是使用udf和case类:

case class Location(lat: Double, long: Double)
val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))

val dfRes = df.
   withColumn("location", makeLocation(col("lat"), col("long"))).
   drop("lat").
   drop("long")

dfRes.printSchema

我们得到

root
 |-- key: string (nullable = false)
 |-- location: struct (nullable = true)
 |    |-- lat: double (nullable = false)
 |    |-- long: double (nullable = false)

一种困难的方法是转换数据并随后应用模式:

val rddRes = df.
    map{case Row(lat, long, key) => Row(key, Row(lat, long))}

val schemaRes = StructType(
    StructField("key", StringType, false) ::
    StructField("location", StructType(
        StructField("lat", DoubleType, false) ::
        StructField("long", DoubleType, false) :: Nil
    ), true) :: Nil 
)

sqlContext.createDataFrame(rddRes, schemaRes).show

我们得到了预期的输出

+------+-------------+
|   key|     location|
+------+-------------+
|Warsaw|[52.23,21.01]|
| Corte|  [42.3,9.15]|
+------+-------------+

从头开始创建嵌套模式可能很繁琐,因此,如果可以的话,我建议您采用第一种方法。如果需要更复杂的结构,可以轻松扩展它:

case class Pin(location: Location)
val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))

df.
    withColumn("pin", makePin(col("lat"), col("long"))).
    drop("lat").
    drop("long").
    printSchema

我们得到预期的输出:

root
 |-- key: string (nullable = false)
 |-- pin: struct (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- lat: double (nullable = false)
 |    |    |-- long: double (nullable = false)

不幸的是,您无法控制nullable字段,因此如果对您的项目很重要,则必须指定架构。

最后,您可以使用struct1.4中引入的功能:

import org.apache.spark.sql.functions.struct

df.select($"key", struct($"lat", $"long").alias("location"))


 类似资料:
  • 问题内容: 我想在我的Gradle版本(版本1.0)中添加集成测试。它们应与我的常规测试分开运行,因为它们需要将webapp部署到本地主机(它们测试该webapp)。这些测试应该能够使用在我的主要源代码集中定义的类。我如何做到这一点? 问题答案: 这花了我一段时间才能弄清楚,在线资源也不是很好。所以我想记录我的解决方案。 这是一个简单的gradle构建脚本,除了主要和测试源集之外,还具有intTe

  • 问题内容: 如何添加到MySQL表的现有列? 问题答案: 我认为您想按照命令中的说明进行操作。可能是这样的: 在上面运行之前,请确保该列具有主索引。

  • 问题内容: 我想更新SQL表中的1列。示例:列中的当前值是这样的 现在,我想像这样更新整个专栏:www.mypizza.com/2013/09/pizzalover.jpg有什么办法可以做到这一点?提前致谢 问题答案: 您可以使用语句简单地更新列

  • 问题内容: 我正在尝试向从创建的数组中添加一列。在这种情况下,它是一个数组:(行,列)。 我想添加第九列。空或零都无所谓。 问题答案: 我认为您的问题是您希望就地添加该列,但是由于存储的numpy数据的原因,它的作用是创建连接数组的副本 所以你需要保存输出: 替代方式: 我相信这三个函数(以及)之间的唯一区别是未指定when的默认行为: 假设 假设除非输入为1d,否则 如果输入为1d,则假定在添加

  • 问题内容: 我正在尝试找到一种将图像添加到JavaFx TableView列的方法,该图像具有通过hibernate从H2数据库填充的其他列中的数据。TableView是在JavaFx Scene Builder中设计的。 到目前为止,这是我设法做到的: 控制器类: 我说那是一个错误。 这是我第一次尝试将图像添加到TableView中。 从昨天开始,我一直四处张望,但现在似乎被困住了。我希望能有所

  • 问题内容: 将空列添加到pandas对象的最简单方法是什么?我偶然发现的最好的东西是 有没有那么不合常理的方法? 问题答案: 如果我理解正确,则应填写作业: