当前位置: 首页 > 知识库问答 >
问题:

不一致计数后窗口引导功能,和过滤器

江正德
2023-03-14

编辑2:

我已经报告了这作为一个问题火花开发人员,我会在这里发布状态当我得到一些。

我有一个问题已经困扰我很长一段时间了。

    null
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("user_id").orderBy("start")
val ts_lead = coalesce(lead("start", 1) .over(w), date_add(col("start"), 1))

val df2 = df1.
    withColumn("end", ts_lead).
    withColumn("duration", col("end").cast("long")-col("start").cast("long"))

df2.where("type='B' and duration>4").count()

  • 运行1:19359949
  • 运行2:19359964

如果我单独运行每个过滤器,一切都很好,我得到一致的结果。但如果我把它们结合起来,就不一致了。

我试着过滤来分离数据流,先持续时间再类型,反之亦然,也没有乐趣。我知道我可以缓存或检查点datframe,但它是非常大的dataset,而且我多次进行类似的计算,所以我不能真正为检查点和缓存腾出时间和磁盘空间。

import org.apache.spark.sql.expressions.Window

val getRandomUser = udf(()=>{
    val users = Seq("John","Eve","Anna","Martin","Joe","Steve","Katy")
   users(scala.util.Random.nextInt(7))
})

val getRandomType = udf(()=>{
    val types = Seq("TypeA","TypeB","TypeC","TypeD","TypeE")
    types(scala.util.Random.nextInt(5))
})

val getRandomStart = udf((x:Int)=>{
    x+scala.util.Random.nextInt(47)
})

for( a <- 0 to 23){
        // use iterator a to continue with next million, repeat 1 mil times
        val x=Range(a*1000000,(a*1000000)+1000000).toDF("id").
            withColumn("start",getRandomStart(col("id"))).
            withColumn("user",getRandomUser()).
            withColumn("type",getRandomType()).
            drop("id")

        x.write.mode("append").orc("s3://your-bucket/random.orc")
}

val w = Window.partitionBy("user").orderBy("start")
val ts_lead = coalesce(lead("start", 1) .over(w), lit(30000000))

val fox2 = spark.read.orc("s3://your-bucket/random.orc").
    withColumn("end", ts_lead).
    withColumn("duration", col("end")-col("start"))

// repeated executions of this line returns different results for count 
fox2.where("type='TypeA' and duration>4").count()

  • 运行1:2551259
  • 运行2:2550756
  • 运行3:2551279

每次运行不同计数

共有1个答案

颛孙镜
2023-03-14

我已经在当地复制了你的问题。

据我所知,问题在于您在这句话中是按duration进行筛选:fox2.where(“type='typea'and duration>4”).count()

并且持续时间是随机生成的。我知道您使用的是种子,但如果将其并行化,则不知道将向每个ID添加哪个随机值。

1 + 21
2 + 14
3 + 5
4 + 17
1 + 21
4 + 14
3 + 5
2 + 17
 类似资料:
  • 问题内容: 我正在使用SQL Server 2012构建库存计划/重新排序引擎。 我有一堆过时的交易,称它们为贷方和借方。我想一次做两件事: 生成运行总计(每日净余额) 生成补充建议。补充将重置“总计”(在“#1”中)为零。 该表如下所示: 我正在使用SQL 2012 SUM OVER()窗口函数来显示这些的运行总数。 我需要找到一种方法将运行总计(又称RT)重置为零(如果下降到零以下)。 我的查

  • 问题内容: 这是该问题的后续内容,其中对我的查询进行了改进,使其使用窗口函数而不是联接内的聚合。虽然查询现在快得多,但我发现结果不正确。 我需要在x年尾随时间框架上执行计算。例如,每行的计算方法是:十年前移至当前行,然后除以结果。为了简单起见,我们将使用1年。 SQL Fiddle对此问题进行了解答。(Postgres 9.6) 作为一个简单的例子,和用于可像这样分别计算: 要做到这一点 ,每行

  • 在本章中,我们将研究Twig Filters and Functions 。 过滤器也用于根据需要使用所需的输出格式化数据。 函数用于生成内容。 Twig模板是包含由值替换的表达式和变量的文本文件。 Twig使用三种类型的标签。 Output tags - 以下语法用于在此处显示已计算的表达式结果。 {{ Place Your Output Here }} Action Tags - 以下语

  • 问题内容: 我正在探索Hive中的窗口功能,并且能够理解所有UDF的功能。虽然,我无法理解我们与其他功能配合使用的分区和顺序。以下是与我计划构建的查询非常相似的结构。 只是试图了解两个关键字都涉及的后台过程。 感谢帮助:) 问题答案: 分析函数为数据集中每个分区的每一行分配一个等级。 子句确定行的分布方式(如果是配置单元,则在缩减程序之间)。 确定行在分区中的排序方式。 第一阶段由分配 ,数据集中

  • 问题内容: 好的,起初这只是和我的一个朋友开玩笑,但后来变成了有趣的技术问题:) 我有下表: 该表包含我所有东西的记录,并分别具有数量和优先级(我需要多少)。 我有一个指定体积的袋子,例如。我想从表中选择所有可以放入袋子的东西,首先包装最重要的东西。 这似乎是使用窗口函数的情况,所以这是我想出的查询: 但是,问题在于Postgres抱怨: 如果我删除此过滤器,则会正确计算总列,对结果进行正确排序,

  • 下面是一些示例代码: 这是我希望看到的输出: