当前位置: 首页 > 面试题库 >

带有动态鞋last的Spark Advanced窗口

董砚
2023-03-14
问题内容

问题:给定一个时间序列数据(即用户活动的点击流)存储在配置单元中,要求使用Spark使用会话ID丰富数据。

会话定义

  • 闲置1小时后,工作阶段将终止
  • 会话保持活动状态,总计2个小时

数据:

click_time,user_id
2018-01-01 11:00:00,u1
2018-01-01 12:10:00,u1
2018-01-01 13:00:00,u1
2018-01-01 13:50:00,u1
2018-01-01 14:40:00,u1
2018-01-01 15:30:00,u1
2018-01-01 16:20:00,u1
2018-01-01 16:50:00,u1
2018-01-01 11:00:00,u2
2018-01-02 11:00:00,u2

以下是仅考虑会话定义中第一点的部分解决方案:

val win1 = Window.partitionBy("user_id").orderBy("click_time")
    val sessionnew = when((unix_timestamp($"click_time") - unix_timestamp(lag($"click_time",1,"2017-01-01 11:00:00.0").over(win1)))/60 >= 60, 1).otherwise(0)
    userActivity
      .withColumn("session_num",sum(sessionnew).over(win1))
      .withColumn("session_id",concat($"user_id", $"session_num"))
      .show(truncate = false)

实际输出:

+---------------------+-------+-----------+----------+
|click_time           |user_id|session_num|session_id|
+---------------------+-------+-----------+----------+
|2018-01-01 11:00:00.0|u1     |1          |u11       |
|2018-01-01 12:10:00.0|u1     |2          |u12       | -- session u12 starts
|2018-01-01 13:00:00.0|u1     |2          |u12       |
|2018-01-01 13:50:00.0|u1     |2          |u12       |
|2018-01-01 14:40:00.0|u1     |2          |u12       | -- this should be a new session as diff of session start of u12 and this row exceeds 2 hours
|2018-01-01 15:30:00.0|u1     |2          |u12       |
|2018-01-01 16:20:00.0|u1     |2          |u12       |
|2018-01-01 16:50:00.0|u1     |2          |u12       | -- now this has to be compared with row 5 to find difference
|2018-01-01 11:00:00.0|u2     |1          |u21       |
|2018-01-02 11:00:00.0|u2     |2          |u22       |
+---------------------+-------+-----------+----------+

为了包括第二个条件,我试图找出当前时间与上次会话开始时间之间的差异,以检查该时间是否超过了2小时,但是对于接下来的行,引用本身会发生变化。这些是一些可以通过运行总和实现的用例,但这在这里不合适。


问题答案:

这不是要解决的直截了当的问题,但这是一种方法:

  1. 使用窗口lag时间戳差异来确定0每个用户的会话(=会话开始)rule #1
  2. 对数据集进行分组以组合每个用户的时间戳比较列表
  3. 通过UDF处理时间戳比较列表以标识rule #2每个用户的会话并创建每个用户的所有会话ID
  4. 通过Spark扩展分组的数据集 explode

下面的示例代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val userActivity = Seq(
  ("2018-01-01 11:00:00", "u1"),
  ("2018-01-01 12:10:00", "u1"),
  ("2018-01-01 13:00:00", "u1"),
  ("2018-01-01 13:50:00", "u1"),
  ("2018-01-01 14:40:00", "u1"),
  ("2018-01-01 15:30:00", "u1"),
  ("2018-01-01 16:20:00", "u1"),
  ("2018-01-01 16:50:00", "u1"),
  ("2018-01-01 11:00:00", "u2"),
  ("2018-01-02 11:00:00", "u2")
).toDF("click_time", "user_id")

def clickSessList(tmo: Long) = udf{ (uid: String, clickList: Seq[String], tsList: Seq[Long]) =>
  def sid(n: Long) = s"$uid-$n"

  val sessList = tsList.foldLeft( (List[String](), 0L, 0L) ){ case ((ls, j, k), i) =>
    if (i == 0 || j + i >= tmo) (sid(k + 1) :: ls, 0L, k + 1) else
       (sid(k) :: ls, j + i, k)
  }._1.reverse

  clickList zip sessList
}

请注意,foldLeftUDF中用于的累加器为的元组(ls, j, k),其中:

  • ls 是要返回的格式化会话ID的列表
  • jk分别用于将条件更改的时间戳记值和会话ID号继续进行到下一次迭代

步骤1

val tmo1: Long = 60 * 60
val tmo2: Long = 2 * 60 * 60

val win1 = Window.partitionBy("user_id").orderBy("click_time")

val df1 = userActivity.
  withColumn("ts_diff", unix_timestamp($"click_time") - unix_timestamp(
    lag($"click_time", 1).over(win1))
  ).
  withColumn("ts_diff", when(row_number.over(win1) === 1 || $"ts_diff" >= tmo1, 0L).
    otherwise($"ts_diff")
  )

df1.show
// +-------------------+-------+-------+
// |         click_time|user_id|ts_diff|
// +-------------------+-------+-------+
// |2018-01-01 11:00:00|     u1|      0|
// |2018-01-01 12:10:00|     u1|      0|
// |2018-01-01 13:00:00|     u1|   3000|
// |2018-01-01 13:50:00|     u1|   3000|
// |2018-01-01 14:40:00|     u1|   3000|
// |2018-01-01 15:30:00|     u1|   3000|
// |2018-01-01 16:20:00|     u1|   3000|
// |2018-01-01 16:50:00|     u1|   1800|
// |2018-01-01 11:00:00|     u2|      0|
// |2018-01-02 11:00:00|     u2|      0|
// +-------------------+-------+-------+

步骤2- 4

val df2 = df1.
  groupBy("user_id").agg(
    collect_list($"click_time").as("click_list"), collect_list($"ts_diff").as("ts_list")
  ).
  withColumn("click_sess_id",
    explode(clickSessList(tmo2)($"user_id", $"click_list", $"ts_list"))
  ).
  select($"user_id", $"click_sess_id._1".as("click_time"), $"click_sess_id._2".as("sess_id"))

df2.show
// +-------+-------------------+-------+
// |user_id|click_time         |sess_id|
// +-------+-------------------+-------+
// |u1     |2018-01-01 11:00:00|u1-1   |
// |u1     |2018-01-01 12:10:00|u1-2   |
// |u1     |2018-01-01 13:00:00|u1-2   |
// |u1     |2018-01-01 13:50:00|u1-2   |
// |u1     |2018-01-01 14:40:00|u1-3   |
// |u1     |2018-01-01 15:30:00|u1-3   |
// |u1     |2018-01-01 16:20:00|u1-3   |
// |u1     |2018-01-01 16:50:00|u1-4   |
// |u2     |2018-01-01 11:00:00|u2-1   |
// |u2     |2018-01-02 11:00:00|u2-2   |
// +-------+-------------------+-------+

还要注意,它click_time是“逐步通过”的2-4以便包含在最终数据集中。



 类似资料:
  • 首选框架是Spring Web Service,但也欢迎其他解决方案。 问候,

  • 我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi

  • 我正在查看Spark SQL中的Spark DataFrame的窗口幻灯片函数。 我有一个包含列、和的数据表。

  • 我正在做一个项目,其中有两个选项卡。在first Tab的用户界面上有一个文本字段和一个按钮。在文本字段中,用户可以写到100之前的任何数字,当他按下按钮Next时,我的应用程序将转到下一个选项卡,在该选项卡上有一个带有三列a、B和C的Gridpane。 所以我想要的是用户将在第一个选项卡的文本字段中输入的任何数字,根据该数字,它应该在第二个选项卡的GridPane中添加相同数量的Textfiel

  • 问题内容: 在我的网站上,我正在创建发票功能。发票具有静态信息:公司信息和收件人信息。但是它也具有动态信息:小时数,描述,总金额等。客户可以使用上面的动态信息添加多行。 现在我的问题是,如何将其实现到数据库中? 目前,我有一个名为“发票”的表,其中的列将包含上面的所有信息。但是通过这种方式,行将具有不必要的信息,例如公司和收货人信息,而实际上每个发票只需要插入一次即可。 你们认为我将如何解决这个问

  • 我第一次使用JavaFX来尝试制作一个应用程序,我可以用它来演示一个带有按钮控件的简单动画。为此,我使用了BoarderPane作为初级阶段,左侧、右侧和底部都使用了GridPanes。 然而,对于中心,我需要能够画一个球内有一条线,我可以旋转不同的视图,同时能够动画,或至少快照移动内的线。 我试过用一个窗格做中心,但不起作用。我试着把它变成自己的场景和副场景,但都不起作用。我不能使用画布,因为它