当前位置: 首页 > 知识库问答 >
问题:

复杂条件下的Spark SQL窗口函数

臧彭亮
2023-03-14
scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))

共有1个答案

潘修文
2023-03-14

火花>=3.2

最近的Spark版本为批处理和结构化流查询中的会话窗口提供了本机支持(请参见SPARK-10816及其子任务,特别是SPARK-34893)。

官方文档提供了很好的使用示例。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, datediff, lag, lit, min, sum}
val userWindow = Window.partitionBy("user_name").orderBy("login_date")
val userSessionWindow = Window.partitionBy("user_name", "session")
val newSession =  (coalesce(
  datediff($"login_date", lag($"login_date", 1).over(userWindow)),
  lit(0)
) > 5).cast("bigint")

val sessionized = df.withColumn("session", sum(newSession).over(userWindow))

查找每个会话的最早日期:

val result = sessionized
  .withColumn("became_active", min($"login_date").over(userSessionWindow))
  .drop("session")

数据集定义为:

val df = Seq(
  ("SirChillingtonIV", "2012-01-04"), ("Booooooo99900098", "2012-01-04"),
  ("Booooooo99900098", "2012-01-06"), ("OprahWinfreyJr", "2012-01-10"), 
  ("SirChillingtonIV", "2012-01-11"), ("SirChillingtonIV", "2012-01-14"),
  ("SirChillingtonIV", "2012-08-11")
).toDF("user_name", "login_date")

结果是:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-04|   2012-01-04| <- The first session for user
|SirChillingtonIV|2012-01-11|   2012-01-11| <- The second session for user
|SirChillingtonIV|2012-01-14|   2012-01-11| 
|SirChillingtonIV|2012-08-11|   2012-08-11| <- The third session for user
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
+----------------+----------+-------------+
 类似资料:
  • 什么是 Nutz.Dao 中的复杂SQL条件 对于 Nutz.Dao 来说,它本质上就是将你的 Java 对象转化成 SQL,然后交给 JDBC 去执行。 而 SQL 中,当执行数据删除和查询操作时,最常用的就是 WHERE 关键字。 WHERE 关键字后面的就是所谓的复杂查询条件 Nutz.Dao 将如何如何使用这个条件 Dao 接口的 clear 方法和 query 方法的第二个参数,就是为了

  • 本文向大家介绍复杂的javascript窗口分帧解析,包括了复杂的javascript窗口分帧解析的使用技巧和注意事项,需要的朋友参考一下 什么是窗口分帧?       窗口分帧就是把一个浏览器文档窗口分隔成多个窗口,每个窗口都可以显示一个独立的网页文件,每个帧(即页面)都有自己的url。 帧窗口该如何创建?   帧通常是由<frameset>和<frame>标记创建的。但在HTML 4中,<if

  • 我需要一些使用jxls阅读器的帮助(http://jxls.sourceforge.net/reference/reader.html). 我对循环一行表示特定java对象的工作表很满意,但在嵌套循环的情况下,我被难倒了。 我需要将上述excel表映射到以下pojo: 我的问题是定义lookbreakcondition来读取内部对象。请注意,类对象的loopbreakcondition需要以下内容

  • 问题内容: 因此,这不是您的平均“条件排序依据”问题……我这里有一个非常棘手的问题。:-)我想允许我的存储过程为结果提供条件排序顺序。通常,可以通过以下方式完成此操作: 我想围绕实际/声明,但这不起作用。上面的方法起作用的原因是因为,当不等于给定值时,SQL Server会将语句转换为常数。因此,如果为0,则实际上有: 那么,第一个排序表达式什么也不做。之所以可行,是因为您可以在常规语句中在子句中

  • 函数可以处理传递给它的参数,并且能返回它的退出状态码给脚本,以便后续处理。 function_name $arg1 $arg2 函数通过位置来引用传递过来的参数(就好像它们是位置参数),例如,$1, $2,等等。 例子 24-2. 带参数的函数 #!/bin/bash # 函数和参数 DEFAULT=default # 默认参数值。D func2 () {

  • 我在声纳的认知复杂性错误,找到我的代码和附上的截图更多的参考。请帮我解决这个问题 有人能给我更新一下在三元运算符的情况下会有什么解决方案吗