当前位置: 首页 > 知识库问答 >
问题:

scala dataframe中的collect_list,它将在固定列号的间隔内收集行

南门欣怡
2023-03-14

我要求将特定分区的所有行收集到DataFrame中的单个行中。我必须将此数据frame转储到cosmosDB中,每个文档只能容纳2MB的数据。但是当我将上面的数据集收集到一行中时,它超过了2MB,并且在写入CosmosDB时抛出了错误。

我想把行收集成一个固定的500行间隔。对于一个分区,前500行应该被收集到一行中,后500行应该被收集到另一行中,依此类推。

输入数据如下所示。

+------+----------+---------------------------------------+
|ID    |TIME      |SGNL                                   |
+------+----------+---------------------------------------+
|00001 |1574360355|{"SN":"Acc","ST":1574360296,"SV":"0.0"}|
|00001 |1574360355|{"SN":"Acc","ST":1574360296,"SV":"0.0"}|
|00001 |1574360355|{"SN":"Acc","ST":1574360296,"SV":"0.0"}|
|00001 |1574360355|{"SN":"Acc","ST":1574360297,"SV":"0.0"}|
|00001 |1574360355|{"SN":"Acc","ST":1574360297,"SV":"0.0"}|
|00001 |1574360355|{"SN":"Acc","ST":1574360297,"SV":"0.0"}|
|00001 |1574360355|{"SN":"Acc","ST":1574360298,"SV":"0.0"}|
+------+----------+---------------------------------------+

我尝试了如下所示,但有些行的大小超过了2MB,无法写入CosmosDB:

val newDF = df.groupBy($"ID", $"TIME").agg(collect_list($"SGNL").as("SGNL"))
+------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ID    |TIME      |SGNL                                                                                                                                                           |
+------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
|00001 |1574360355|{"SN":"Acc","ST":1574360296,"SV":"0.0"},{"SN":"Acc","ST":1574360296,"SV":"0.0"},{"SN":"Acc","ST":1574360296,"SV":"0.0"},.......................................|
|00002 |1574360355|{"SN":"Acc","ST":1574360297,"SV":"0.0"},{"SN":"Acc","ST":1574360297,"SV":"0.0"},{"SN":"Acc","ST":1574360298,"SV":"0.0"},.......................................| 
|00003 |1574360355|{"SN":"Acc","ST":1574360297,"SV":"0.0"},{"SN":"Acc","ST":1574360297,"SV":"0.0"},{"SN":"Acc","ST":1574360298,"SV":"0.0"},.......................................| 
|00004 |1574360355|{"SN":"Acc","ST":1574360297,"SV":"0.0"},{"SN":"Acc","ST":1574360297,"SV":"0.0"},{"SN":"Acc","ST":1574360298,"SV":"0.0"},.......................................|                                        |
+------+----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------+
+------+----------+--------------------------------------------+
|ID    |TIME      |SGNL                                        |
+------+----------+--------------------------------------------+
|00001 |1574360355|{1st ROW},{2nd ROW},......{500th ROW}       |
|00001 |1574360355|{501st ROW},{502nd ROW},......{1000th ROW}  |
|00001 |1574360355|{1001st ROW},{1002nd ROW},......{1500th ROW}|
|00001 |1574360355|{1501st ROW},{1502nd ROW},......{2000th ROW}|
|..............................................................|
|..............................................................|

|00002 |1574360355|{1st ROW},{2nd ROW},......{500th ROW}       |
|00002 |1574360355|{501st ROW},{502nd ROW},......{1000th ROW}  |
|00002 |1574360355|{1001st ROW},{1002nd ROW},......{1500th ROW}|
|00002 |1574360355|{1501st ROW},{1502nd ROW},......{2000th ROW}|
|..............................................................|
+------+----------+---------------------------------------------+

共有1个答案

夏侯弘量
2023-03-14

您可以使用monitonicallyincreasingid构建索引,将其除以500并按该索引进行分组。它将把前500行放在一起,下500行放在一起,以此类推。

df.withColumn("fancy_id", floor(monotonicallyIncreasingId / 500))
  .groupBy("fancy_id")
  .agg(collect_list($"SGNL").as("SGNL"))
  .drop("fancy_id") // if you want to get rid of the artificial id.

如果不想混合ID列,可以使用groupby(“id”,“fancy_id”)

然而,每个ID的第一组不一定是500。例如,您将得到如下内容:(id1,500元素)、(id1,320元素)、(id2,180元素)、(id2,500元素)、(id2,500元素)、(id2,50元素)、(id3,450元素)...

如果您更喜欢使用(id1,500元素)、(id1,320元素)、(id2,500元素)、(id2,500元素)、(id2,10元素)、(id3,500元素)、(id3,5元素)...,其中第一组总是有500个元素,则可以使用一个窗口:

val w = Window.partitionBy('ID).orderBy('fancy_id)
df.withColumn("fancy_id", monotonicallyIncreasingId)
  .withColumn("rank", rank() over w)
  .groupBy($"ID", floor($"rank" / 500))
  .agg(collect_list($"SGNL").as("SGNL"))
 类似资料:
  • 问题内容: 我使用的是Python 3,我想编写一个程序,在一定时间内要求多个用户输入。这是我的尝试: 问题是,即使时间到了,代码仍然等待输入。我希望循环在时间用完时停止。我该怎么做呢?谢谢! 问题答案: 此解决方案与 平台无关, 并且会 立即 中断键入以告知有关现有超时的信息。不必等到用户按下ENTER键就可以发现发生了超时。除了及时通知用户之外,这还可以确保在进一步处理超时后不再输入任何内容。

  • 我有两个间隔序列。 第一个是固定的,不重叠的,所以类似于: 第二个不是固定的,所以它只是间隔长度的有序列表: 手头的任务是: 将第二个列表中的每个间隔放置在给定的起点,以便第二个列表成为一个固定的、不重叠的间隔列表,就像第一个列表一样,同时保持其顺序 查找使某个列表中某个间隔内的整数数量最少,但不在另一个列表的任何间隔内的整数数量最少的对齐方式 很简单的例子: 最佳解决方案是将长度为10的区间放在

  • 与其查询具有开始和结束日期的间隔列表,不如从列表中检索仅与搜索开始和结束日期重叠的所有间隔,最好的方法是: 使用整数示例,以整数间隔列表为例。在此列表中,以下是所有唯一的间隔集,其中每组中的每个间隔都相互重叠: 以下是DateInterval的类: 我将收到按开始时间排序的间隔列表,如下所示: (在这个示例列表中,开始日期和结束日期总是均匀地落在一小时上。但是,它们可以落在任何一秒上(或者毫秒)。

  • 假设我有一个这样的范围列表 现在我想找到一个范围,比如。我的算法应该给我这个范围的所有范围。例如,这个的输出应该是 <代码>输出-[1,3]、[2,5]、[4,6]、[8,10] 我该如何着手解决这个问题? PS:我知道段树可能会有所帮助。我可以在其中构建树来存储间隔并查询位于间隔内的Point,但如何在给定间隔的情况下获取所有间隔。

  • 我有一个Java应用程序,它使用Prometheus库,以便在执行期间收集度量。稍后,我将Prometheus服务器链接到Grafana,以便可视化这些度量。我想知道是否可以让格拉法纳为这些度量显示一个自定义的X轴?通常的X轴是在当地时间。我能让它显示带有GPS/UTC时间戳的数据吗?有可能吗?如果是,需要什么?保存时间戳的附加度量参数? 我这样声明度量变量: 并添加如下所示的数据: 如有任何帮助

  • 假设您有一个间隔列表,例如[(0 4),(1 3),(2 5),(2 6)]。此列表未排序。然后给您一个范围,如[1 5]。您必须返回适合范围内的间隔数。在这个问题中,它将返回2。((1 3)和(2 5)) 间隔列表保持不变,但我们最多得到100000个查询,每个查询由一个范围组成。对于每个范围查询,我们必须返回适合其中的间隔数。 在研究之后,我读到了间隔树。但是,您只能查询与任何给定范围重叠的间