当前位置: 首页 > 知识库问答 >
问题:

火花窗口函数之间的范围产生不正确的结果

孙博艺
2023-03-14

我试图在Spark DataFrame上使用RangeBetween对Long类型的列执行窗口函数,但窗口的结果不正确。我做错什么了吗?

val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
      Seq(
        Row("2014-11-01 08:10:10.12345", 141482941012345L),
        Row("2014-11-01 09:10:10.12345", 141483301012345L),
        Row("2014-11-01 10:10:10.12345", 141483661012345L),
        Row("2014-11-02 10:10:10.12345", 141492301012345L),
        Row("2014-11-03 10:10:10.12345", 141500941012345L),
        Row("2014-11-04 10:10:10.12345", 141509581012345L),
        Row("2014-11-05 10:10:10.12345", 141518221012345L),
        Row("2014-11-06 10:10:10.12345", 141526861012345L),
        Row("2014-11-07 10:10:10.12345", 141535501012345L),
        Row("2014-11-08 10:10:10.12345", 141544141012345L)
      )
    )
val schema = new StructType()
  .add(StructField("dateTime", StringType, true))
  .add(StructField("unixTime", LongType, true))

val df = spark.createDataFrame(rowsRdd, schema)
df.show(10, false)
df.printSchema()
+-------------------------+---------------+
|dateTime                 |unixTime       |
+-------------------------+---------------+
|2014-11-01 08:10:10.12345|141482941012345|
|2014-11-01 09:10:10.12345|141483301012345|
|2014-11-01 10:10:10.12345|141483661012345|
|2014-11-02 10:10:10.12345|141492301012345|
|2014-11-03 10:10:10.12345|141500941012345|
|2014-11-04 10:10:10.12345|141509581012345|
|2014-11-05 10:10:10.12345|141518221012345|
|2014-11-06 10:10:10.12345|141526861012345|
|2014-11-07 10:10:10.12345|141535501012345|
|2014-11-08 10:10:10.12345|141544141012345|
+-------------------------+---------------+
root
 |-- dateTime: string (nullable = true)
 |-- unixTime: long (nullable = true)

第一列是事件的时间戳(字符串,我们在实践中不会使用它),第二列是时间戳对应的unix时间,单位为10E-5秒。

现在,我想计算当前行的窗口中的事件数。例如,在3小时窗口中,我做:

val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-3*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))

正确返回:

+-------------------------+---------------+---+
|dateTime                 |unixTime       |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1  |
|2014-11-01 09:10:10.12345|141483301012345|2  |
|2014-11-01 10:10:10.12345|141483661012345|3  |
|2014-11-02 10:10:10.12345|141492301012345|1  |
|2014-11-03 10:10:10.12345|141500941012345|1  |
|2014-11-04 10:10:10.12345|141509581012345|1  |
|2014-11-05 10:10:10.12345|141518221012345|1  |
|2014-11-06 10:10:10.12345|141526861012345|1  |
|2014-11-07 10:10:10.12345|141535501012345|1  |
|2014-11-08 10:10:10.12345|141544141012345|1  |
+-------------------------+---------------+---+
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-6*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))

+-------------------------+---------------+---+
|dateTime                 |unixTime       |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|0  |
|2014-11-01 09:10:10.12345|141483301012345|0  |
|2014-11-01 10:10:10.12345|141483661012345|0  |
|2014-11-02 10:10:10.12345|141492301012345|0  |
|2014-11-03 10:10:10.12345|141500941012345|0  |
|2014-11-04 10:10:10.12345|141509581012345|0  |
|2014-11-05 10:10:10.12345|141518221012345|0  |
|2014-11-06 10:10:10.12345|141526861012345|0  |
|2014-11-07 10:10:10.12345|141535501012345|0  |
|2014-11-08 10:10:10.12345|141544141012345|0  |
+-------------------------+---------------+---+
val hour: Long = 60*60*100000L
val w = Window.orderBy(col("unixTime")).rangeBetween(-12*hour, 0)
val df2 = df.withColumn("cts", count(col("dateTime")).over(w)).orderBy(asc("unixTime"))

+-------------------------+---------------+---+
|dateTime                 |unixTime       |cts|
+-------------------------+---------------+---+
|2014-11-01 08:10:10.12345|141482941012345|1  |
|2014-11-01 09:10:10.12345|141483301012345|1  |
|2014-11-01 10:10:10.12345|141483661012345|1  |
|2014-11-02 10:10:10.12345|141492301012345|1  |
|2014-11-03 10:10:10.12345|141500941012345|1  |
|2014-11-04 10:10:10.12345|141509581012345|1  |
|2014-11-05 10:10:10.12345|141518221012345|1  |
|2014-11-06 10:10:10.12345|141526861012345|1  |
|2014-11-07 10:10:10.12345|141535501012345|1  |
|2014-11-08 10:10:10.12345|141544141012345|1  |
+-------------------------+---------------+---+

和这个ISSU有关吗?[SPARK-19451][SQL]rangeBetween方法应接受长值作为边界#18540。它是否已经在最新版本的Spark中实现了?

共有1个答案

魏泰
2023-03-14

确实与挂钩问题有关。6*hour是2160000000,大于integer.max_value(2147483647),因此会导致整数溢出:

scala> (6 * hour).toInt
res4: Int = -2134967296

这个问题已经在当前的master上解决,并将在Spark2.3中发布。

 类似资料:
  • 问题内容: 因此,我在此代码位中的目标是随机掷两个骰子,众所周知,您的普通骰子只有6个面,因此我导入了Foundation以访问arc4random_uniform(UInt32)。我尝试使用(1..7)的范围来避免随机获得0,但是返回了一个我不太喜欢的错误。我试图这样做: 但是那又回来了 找不到接受提供的参数的’init’的重载 我希望这是足够的信息,可以为您提供帮助,帮助您:) 请注意,我只是

  • [新加入Spark]语言-Scala 根据文档,RangePartitioner对元素进行排序并将其划分为块,然后将块分发到不同的机器。下面的例子说明了它是如何工作的。 假设我们有一个数据框,有两列,一列(比如“a”)的连续值从1到1000。还有另一个数据帧具有相同的模式,但对应的列只有4个值30、250、500、900。(可以是任意值,从1到1000中随机选择) 如果我使用RangePartit

  • 我有一个类似于以下内容的DataFrame(sqlDF)(在本例中进行了简化),其中我试图删除在另一行的开始日期和结束日期范围内具有start_date和end_date的所有行: 首先,最终用户要求我删除start_date和end_date之间间隔小于5天的所有记录,我使用了以下方法: 从而产生如下所示的数据frame: 现在,我需要筛选出开始日期和结束日期在同一id的另一行开始日期和结束日期

  • 我注意到,当我在DataFrame上使用窗口函数后,如果我用函数调用map()时,Spark会返回一个“Task not serializable”异常这是我的代码: 这是堆栈跟踪: 异常:任务不可序列化在org.apache.spark.util.ClosureCleaner$.EnsureClealizable(ClosureCleaner.scala:304)在org.apache.spar

  • 我尝试过用这个方法来计算累积值,但是如果日期字段与累积字段中的值相同,那么有人能提出类似于这个问题的解决方案吗

  • 我有两个名为“alarm”和“interprise”的流,它们包含JSON。如果警报器和干预器连接,那么它们将具有相同的钥匙。我想联系他们来检测24小时前没有干预的所有警报。 但这个程序不起作用,结果给我的所有警报就好像24小时前没有干预一样。我重新检查了我的数据集5次,有些警报在警报日期前24小时内进行了干预。 这张图片说明了情况:在此处输入图像描述 因此我需要知道警报之前是否有干预。 程序代码