当前位置: 首页 > 知识库问答 >
问题:

使用Scala获取与Spark数据集中最新时间戳对应的行

郭麒
2023-03-14

我对Spark和Scala比较陌生。我有一个数据帧,它有以下格式:

| Col1 | Col2 | Col3 | Col_4 | Col_5 | Col_TS                  | Col_7 | 

| 1234 | AAAA | 1111 | afsdf | ewqre | 1970-01-01 00:00:00.0   | false |
| 1234 | AAAA | 1111 | ewqrw | dafda | 2017-01-17 07:09:32.748 | true  |
| 1234 | AAAA | 1111 | dafsd | afwew | 2015-01-17 07:09:32.748 | false |
| 5678 | BBBB | 2222 | afsdf | qwerq | 1970-01-01 00:00:00.0   | true  |
| 5678 | BBBB | 2222 | bafva | qweqe | 2016-12-08 07:58:43.04  | false |
| 9101 | CCCC | 3333 | caxad | fsdaa | 1970-01-01 00:00:00.0   | false |

我需要做的是获取对应于最新时间戳的行。在上面的示例中,键是Col1、Col2和Col3。Col_TS表示时间戳,Col_7是一个布尔值,用于确定记录的有效性。我想做的是找到一种方法,根据密钥对这些记录进行分组,并保留具有最新时间戳的记录。

因此,上述数据框中的操作输出应为:

| Col1 | Col2 | Col3 | Col_4 | Col_5 | Col_TS                  | Col_7 | 

| 1234 | AAAA | 1111 | ewqrw | dafda | 2017-01-17 07:09:32.748 | true  |
| 5678 | BBBB | 2222 | bafva | qweqe | 2016-12-08 07:58:43.04  | false |
| 9101 | CCCC | 3333 | caxad | fsdaa | 1970-01-01 00:00:00.0   | false |

我想出了一个部分解决方案,但是这样我只能返回记录分组的列键的数据帧,而不是其他列。

df = df.groupBy("Col1","Col2","Col3").agg(max("Col_TS"))


| Col1 | Col2 | Col3 | max(Col_TS)             |

| 1234 | AAAA | 1111 | 2017-01-17 07:09:32.748 |
| 5678 | BBBB | 2222 | 2016-12-08 07:58:43.04  | 
| 9101 | CCCC | 3333 | 1970-01-01 00:00:00.0   | 

有人能帮我想出执行此操作的Scala代码吗?

共有2个答案

杨征
2023-03-14

一个选项是首先按列对数据帧进行排序,然后按列1、列2和列3分组,并从每个列中选取最后一项:

val val_columns = Seq("Col_4", "Col_5", "Col_TS", "Col_7").map(x => last(col(x)).alias(x))

(df.orderBy("Col_TS")
   .groupBy("Col1", "Col2", "Col3")
   .agg(val_columns.head, val_columns.tail: _*).show)

+----+----+----+-----+-----+--------------------+-----+
|Col1|Col2|Col3|Col_4|Col_5|              Col_TS|Col_7|
+----+----+----+-----+-----+--------------------+-----+
|1234|AAAA|1111|ewqrw|dafda|2017-01-17 07:09:...| true|
|9101|CCCC|3333|caxad|fsdaa|1970-01-01 00:00:...|false|
|5678|BBBB|2222|bafva|qweqe|2016-12-08 07:58:...|false|
+----+----+----+-----+-----+--------------------+-----+
巫化
2023-03-14

您可以按如下方式使用窗口功能

import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("Col1","Col2","Col3").orderBy(col("Col_TS").desc)

df.withColumn("maxTS", first("Col_TS").over(windowSpec))
.select("*").where(col("maxTS") === col("Col_TS"))
.drop("maxTS")
  .show(false)

您应该得到如下输出

+----+----+----+-----+-----+----------------------+-----+
|Col1|Col2|Col3|Col_4|Col_5|Col_TS                |Col_7|
+----+----+----+-----+-----+----------------------+-----+
|5678|BBBB|2222|bafva|qweqe|2016-12-0807:58:43.04 |false|
|1234|AAAA|1111|ewqrw|dafda|2017-01-1707:09:32.748|true |
|9101|CCCC|3333|caxad|fsdaa|1970-01-0100:00:00.0  |false|
+----+----+----+-----+-----+----------------------+-----+
 类似资料:
  • 输入DF: 我试图找到运行时间戳的差异就main_id 输出DF: 已尝试的代码: 我得到的差异函数的错误,有一种方法来实现这一点。任何建议请。

  • 我有到UTC的秒偏移量,数据库中存储的所有时间戳都是UTC,如何在选择查询期间应用偏移量, 我想在UTC时间插入数据库并在本地时间检索。 在PHP上: 我可以在select的mysql运行时中减去unix\u timestamp字段$user\u timezone\u offset吗? 我正在插入UTC,这显然是我所需要的,但为了选择用户的本地时间,我需要以某种方式将偏移量应用于存储在数据库中的时

  • 我需要将两个数据集与CLOSE时间戳连接起来。第一个数据集是来自移动应用程序的日记数据集: 在这里: 第二个数据集是来自加速度计日志的数据集,显示移动(=INVH)或空闲(=NIVH): 在这里: 我需要根据时间戳字段之间的时间差连接两个数据帧。例如,在df1上留下join,以查看应用程序日志数据如何与实际加速度计日志一致。简单的左连接在这里不起作用,因为在大多数情况下有一个滞后时间。所以我的问题

  • 问题内容: 我在玩JPA(具体来说是Eclipselink)。下面的实体具有一个时间戳,应该在该实体上次更新时反映该时间戳。 每次更改此实体时,使JPA自动更新该时间戳的策略是什么? 如果我还想要一个“创建”时间戳记,该时间戳记仅在实体首次保留时设置,而永远不允许再次更改,该怎么办? 问题答案: 使用@PrePersist和@PreUpdate批注并编写您自己的事件侦听器。 详细了解一下此答案。它

  • 我正在寻找一种方法,将我的大型spark数据集划分为组/批,并在某些函数中处理这组行。所以基本上,这组行应该被输入到我的函数中,输出是我的单位,因为我不想聚合或更新输入记录,只是执行一些计算。 为了理解,假设我有以下输入。 假设我需要按col1和col2分组,这将给我以下分组 (1, A,1),(1, A,4),(1, A,5)--- (1,B,2)--- (1,C,3),(1,C,6)--- (

  • 问题内容: 对于某些Hibernate实体,我们需要存储其创建时间和上次更新时间。你将如何设计? 你将在数据库中使用什么数据类型(假设使用MySQL,可能与JVM时区不同)?数据类型是否支持时区? 你会在Java中使用什么数据类型(Date,Calendar,long,…)? 你将由谁负责设置时间戳(数据库,ORM框架(hibernate)或应用程序程序员)? 你将使用哪些注释进行映射(例如@Te