当前位置: 首页 > 知识库问答 >
问题:

激发 UDF 未正确提供滚动计数

姬寂离
2023-03-14

我有一个Spark UDF来计算列的滚动计数,精确地说是wt时间。如果我需要计算24小时的滚动计数,例如以时间2020-10-02 09:04:00进入,我需要回看直到2020-10-01 09:04:00(非常精确)。

如果我在本地运行,滚动计数UDF工作良好,并给出正确的计数,但当我在集群上运行时,它给出的结果不正确。下面是示例输入和输出

输入

+---------+-----------------------+
|OrderName|Time                   |
+---------+-----------------------+
|a        |2020-07-11 23:58:45.538|
|a        |2020-07-12 00:00:07.307|
|a        |2020-07-12 00:01:08.817|
|a        |2020-07-12 00:02:15.675|
|a        |2020-07-12 00:05:48.277|
+---------+-----------------------+

预期产出

+---------+-----------------------+-----+
|OrderName|Time                   |Count|
+---------+-----------------------+-----+
|a        |2020-07-11 23:58:45.538|1    |
|a        |2020-07-12 00:00:07.307|2    |
|a        |2020-07-12 00:01:08.817|3    |
|a        |2020-07-12 00:02:15.675|1    |
|a        |2020-07-12 00:05:48.277|1    |
+---------+-----------------------+-----+

最后两个条目值在本地是4和5,但是在集群上它们是不正确的。我最大的猜测是数据分布在执行器之间,udf也在每个执行器上被并行调用。由于UDF的参数之一是列(在这个例子中是分区键-顺序名),如果是这样的话,我该如何控制/纠正集群的行为呢?以便以正确的方式为每个分区计算正确的计数。有什么建议吗

共有1个答案

有凯泽
2023-03-14

根据您的评论,您想要统计过去24小时内每条记录的记录总数

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.LongType

//A sample data (Guessing from your question)
val df = Seq(("a","2020-07-10 23:58:45.438","1"),("a","2020-07-11 23:58:45.538","1"),("a","2020-07-11 23:58:45.638","1")).toDF("OrderName","Time","Count")

// Extract the UNIX TIMESTAMP for your time column
val df2 = df.withColumn("unix_time",concat(unix_timestamp($"Time"),split($"Time","\\.")(1)).cast(LongType))

val noOfMilisecondsDay : Long = 24*60*60*1000

//Create a window per `OrderName` and select rows from `current time - 24 hours` to `current time` 
val winSpec = Window.partitionBy("OrderName").orderBy("unix_time").rangeBetween(Window.currentRow - noOfMilisecondsDay, Window.currentRow)

// Final you perform your COUNT or SUM(COUNT) as per your need
val finalDf = df2.withColumn("tot_count", count("OrderName").over(winSpec))

//or val finalDf = df2.withColumn("tot_count", sum("Count").over(winSpec))
 类似资料:
  • 问题内容: 迁移到Hibernate 5.2.7之后,我似乎在id字段中得到了不正确的值。 我的代码: Hibernate触发以下查询: 得出5367。表中id字段的最后一个值为5358。 我明白了 我相信这个问题类似于这和这个,但我不得不问,因为给出的解决方案有没有为我工作: 我加了 到我的persistence.xml,但无济于事。任何帮助将不胜感激。 问题答案: 实际上,当您迁移到新的Hib

  • 问题内容: 根据该Jetty指南中有关使用Keytool和OpenSSL的步骤3b(最后一步),我正在执行以下命令: 当我运行命令时,我得到: 你知道如何解决吗? 问题答案: 就我而言,我使用下载的Windows openSSL完成了一些步骤,而其他步骤则使用了CentOs6框上已经存在的openSSL。当我在CentOs / linux机器上执行所有步骤时,错误消失了。 次要的或许是注意linu

  • 我有一个编辑文本视图,可以在编辑文本中的提要上发表评论。当我在编辑文本中放入大量文本并出现滚动条时,我无法滚动编辑文本的文本内容<当我想在编辑文本中滚动时,代码>循环视图(code>Recyclerview)会向上滚动。我怎样才能解决这个问题。请提出建议。

  • 如文件中所述: “拥有REQUEST_IGNORE_BATTERY_OPTIMIZATIONS权限的应用程序可以触发一个系统对话框,让用户直接将应用程序添加到白名单中,而不需要进入设置。应用程序会触发一个ACTION_REQUEST_IGNORE_BATTERY_OPTIMIZATIONS来触发对话框。”

  • 我有一个Spring Boot应用程序和一个服务器静态网页控制器(React build): index.html位于:../resources/statig/index.html 也在application.yml中: 我有两个问题(问题2是主要问题): > 我必须调用以下结尾带有“/”的url:http://localhost:8100/test/I would like for http:/

  • > 我通过terraform创建了一个自签名tls证书和私钥。这些文件称为服务器。密钥和服务器。crt 我用这个证书和私钥创建了一个kubernetes tls机密:kubectl create secret tls dpaas secret-n dpaas prod-key server。密钥--cert server.crt 这工作正常,nginx入口ssl终止工作,以下kubectl命令:k