当前位置: 首页 > 知识库问答 >
问题:

如何用Scala在Spark中做滑动窗口排序?

裴劲
2023-03-14
+-----+-------------------+---------------------+------------------+
|query|similar_queries    |model_score          |count             |
+-----+-------------------+---------------------+------------------+
|shirt|funny shirt        |0.0034038130658784866|189.0             |
|shirt|shirt womens       |0.0019435265241921438|136.0             |
|shirt|watch              |0.001097496453284101 |212.0             |
|shirt|necklace           |6.694577024597908E-4 |151.0             |
|shirt|white shirt        |0.0037413097560623485|217.0             |
|shirt|shoes              |0.0022062579255572733|575.0             |
|shirt|crop top           |9.065831060804897E-4 |173.0             |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |
|shirt|shorts             |0.002669621942466027 |200.0             |
|shirt|black shirt        |0.03264296242546658  |114.0             |
+-----+-------------------+---------------------+------------------+
lazy val countWindowByFreq = Window.partitionBy(col(QUERY)).orderBy(col(COUNT).desc)
val ranked_data = data.withColumn("count_rank", row_number over countWindowByFreq)

+-----+-------------------+---------------------+------------------+----------+
|query|similar_queries    |model_score          |count             |count_rank|
+-----+-------------------+---------------------+------------------+----------+
|shirt|shoes              |0.0022062579255572733|575.0             |1         |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |2         |
|shirt|white shirt        |0.0037413097560623485|217.0             |3         |
|shirt|watch              |0.001097496453284101 |212.0             |4         |
|shirt|shorts             |0.002669621942466027 |200.0             |5         |
|shirt|funny shirt        |0.0034038130658784866|189.0             |6         |
|shirt|crop top           |9.065831060804897E-4 |173.0             |7         |
|shirt|necklace           |6.694577024597908E-4 |151.0             |8         |
|shirt|shirt womens       |0.0019435265241921438|136.0             |9         |
|shirt|black shirt        |0.03264296242546658  |114.0             |10        |
+-----+-------------------+---------------------+------------------+----------+

在第一个窗口row_number1到4中,新的秩(新列)将是

1. polo shirts for men
2. white shirt
3. shoes
4. watch

在第一个窗口row_number5到8中,新的秩(新列)将是

5. funny shirt
6. shorts
7. shirt womens 
8. crop top

在第一个窗口中,Row_Number9要Rest,新的秩(新列)将是

9. black shirt 
10. shirt womens

但这给了我:

sql.AnalysisException: Window Frame ROWS BETWEEN CURRENT ROW AND 3 FOLLOWING must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;

此外,尝试了。rowsbetween(-3,0)但这也给我带来了错误:

org.apache.spark.sql.AnalysisException: Window Frame ROWS BETWEEN 3 PRECEDING AND CURRENT ROW must match the required frame ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW;

共有1个答案

金嘉
2023-03-14

由于您已经计算了count_rank,下一步是找到一种方法,将行分组为四组。可以这样做:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val ranked_data_grouped = ranked_data
  .withColumn("bucket", (($"count_rank" -1)/4).cast(IntegerType))

ranked_data_grouped将如下所示:

+-----+-------------------+---------------------+------------------+----------+-------+
|query|similar_queries    |model_score          |count             |count_rank|bucket |
+-----+-------------------+---------------------+------------------+----------+-------+
|shirt|shoes              |0.0022062579255572733|575.0             |1         |0      |
|shirt|polo shirts for men|0.007706416273211698 |349.0             |2         |0      |      
|shirt|white shirt        |0.0037413097560623485|217.0             |3         |0      |
|shirt|watch              |0.001097496453284101 |212.0             |4         |0      |
|shirt|shorts             |0.002669621942466027 |200.0             |5         |1      |
|shirt|funny shirt        |0.0034038130658784866|189.0             |6         |1      |
|shirt|crop top           |9.065831060804897E-4 |173.0             |7         |1      |
|shirt|necklace           |6.694577024597908E-4 |151.0             |8         |1      |
|shirt|shirt womens       |0.0019435265241921438|136.0             |9         |2      |
|shirt|black shirt        |0.03264296242546658  |114.0             |10        |2      |
+-----+-------------------+---------------------+------------------+----------+-------+

现在,您所要做的就是按bucket分区和按model_score:

val output = ranked_data_grouped
  .withColumn("finalRank", row_number().over(Window.partitionBy($"bucket").orderBy($"model_score".desc)))
 类似资料:
  • 阅读Spark method sortByKey: 是否可能只返回“N”个数量的结果。因此,与其返回所有结果,不如返回前10名。我可以将已排序的集合转换为数组,并使用方法,但既然这是一个O(N)操作,有没有更有效的方法?

  • 假设我有一个每1分钟开始的2小时窗口。下一步是应用GroupBy转换。 如果能解释这一点,我将不胜感激。无法真正找到相关信息

  • 我有以下要求: 从酒吧子主题读取事件 取一个持续时间为30分钟、周期为1分钟的窗口 在该窗口中,如果给定id的3个事件都匹配某个谓词,那么我需要在不同的pub子主题中引发一个事件 应该在第3个事件进入分组id时立即引发该事件,因为这是为了检测欺诈行为。在一个窗格中,有许多ID,其中有3个事件与我的谓词匹配,所以我可能需要在每个窗格中发出多个事件 由于滑动窗口重叠,输出PCollection包含重复

  • 我最初的想法是简单地迭代,如果我们想要平均每X天,X次,每次只需按日期分组元素,并有一个偏移量。 所以如果我们有这样的场景: 天数:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 第2组:(6,F)(7,G)(8,H)(9,I)(10,J) 第3组:(11,K)(12,L)(13,M)(14,N)(15,O) 第二次迭代:

  • 介绍 将TCP与UDP这样的简单传输协议区分开来的是它传输数据的质量。TCP对于发送数据进行跟踪,这种数据管理需要协议有以下两大关键功能: 可靠性:保证数据确实到达目的地。如果未到达,能够发现并重传。 数据流控:管理数据的发送速率,以使接收设备不致于过载。 要完成这些任务,整个协议操作是围绕滑动窗口确认机制来进行的。因此,理解了滑动窗口,也就是理解了TCP。 更多信息 TCP面向流的滑动窗口确认机

  • 目前准备用redis实现分布式限流策略, 抛开那些redis插件,我觉得zset实现理论上可行, 具体逻辑可以是这样: 抽象限流逻辑: 针对某个action,需要在一段时间(即为窗口),只能容许N个操作 redis伪代码 问题: 假设某个行为有大量的不同样本或者不同实例在调用, 也就是key不一样, 假设有百万以上,但是每个key的行为发生次数有限,例如就一次, 这个时候就会出现很大冷数据 处理该