当前位置: 首页 > 知识库问答 >
问题:

从数据集中同时聚合多个字段

乜清野
2023-03-14

我有以下方案的数据:

sourceip
destinationip
packets sent

我想从这些数据中计算出几个聚合字段,并具有以下模式:

ip 
packets sent as sourceip
packets sent as destination

在RDD的快乐日子里,我可以使用聚合,定义{ip-的映射

在Dataset/Dataframe聚合中不再可用,而是可以使用UDAF,不幸的是,从我使用UDAF的经验来看,它们是不可变的,这意味着它们不能使用(必须在每次映射更新时创建一个新实例)示例解释在这里

一方面,从技术上讲,我可以将数据集转换为RDD、聚合等,然后返回数据集。我预计这会导致性能下降,因为数据集更加优化。由于复制,UDAF是不可能的。

是否有其他方法来执行聚合?

共有3个答案

贡烨烁
2023-03-14

这是一个使用分解的pyspark版本。它更加冗长,但逻辑与flatMap版本完全相同,仅使用纯数据帧代码。

sc\
  .parallelize([("ip1", "ip2", 5), ("ip2", "ip3", 7), ("ip2", "ip1", 1), ("ip3", "ip2", 3)])\
  .toDF(("from", "to", "p"))\
  .select(F.explode(F.array(\
      F.struct(F.col("from").alias("ip"),\
               F.col("p").alias("received"),\
               F.lit(0).cast("long").alias("sent")),\
      F.struct(F.col("to").alias("ip"),\
               F.lit(0).cast("long").alias("received"),\
               F.col("p").alias("sent")))))\
  .groupBy("col.ip")\
  .agg(F.sum(F.col("col.received")).alias("received"), F.sum(F.col("col.sent")).alias("sent"))

// +---+----+--------+
// | ip|sent|received|
// +---+----+--------+
// |ip2|   8|       8|
// |ip3|   3|       7|
// |ip1|   5|       1|
// +---+----+--------+
步建茗
2023-03-14

在没有任何自定义聚合的情况下进行它的一种方法是使用平面图(或爆炸用于数据帧),如下所示:

case class Info(ip : String, sent : Int, received : Int)
case class Message(from : String, to : String, p : Int)
val ds = Seq(Message("ip1", "ip2", 5), 
             Message("ip2", "ip3", 7), 
             Message("ip2", "ip1", 1), 
             Message("ip3", "ip2", 3)).toDS()

ds
    .flatMap(x => Seq(Info(x.from, x.p, 0), Info(x.to, 0, x.p)))
    .groupBy("ip")
    .agg(sum('sent) as "sent", sum('received) as "received")
    .show


// +---+----+--------+
// | ip|sent|received|
// +---+----+--------+
// |ip2|   8|       8|
// |ip3|   3|       7|
// |ip1|   5|       1|
// +---+----+--------+

就性能而言,我不确定与自定义聚合相比,flatMap是否是一种改进。

百里芷阳
2023-03-14

听起来您需要一个标准的熔化(如何熔化Spark数据帧?)和枢轴组合:

val df = Seq(
  ("192.168.1.102", "192.168.1.122", 10),
  ("192.168.1.122", "192.168.1.65", 10),
  ("192.168.1.102", "192.168.1.97", 10)
).toDF("sourceip", "destinationip", "packets sent")


df.melt(Seq("packets sent"), Seq("sourceip", "destinationip"), "type", "ip")
  .groupBy("ip")
  .pivot("type", Seq("sourceip", "destinationip"))
  .sum("packets sent").na.fill(0).show

// +-------------+--------+-------------+             
// |           ip|sourceip|destinationip|
// +-------------+--------+-------------+
// | 192.168.1.65|       0|           10|
// |192.168.1.102|      20|            0|
// |192.168.1.122|      10|           10|
// | 192.168.1.97|       0|           10|
// +-------------+--------+-------------+
 类似资料:
  • 给定一个数据帧say,包含100列和100行,我需要列的子集。我想同时索引两个(或多个)列块。 例如,我想要的是: (显然,这不起作用) 但是我想要第1列到第20列以及第55列到第57列。我可以用两个单独的操作来实现这一点,并将它们连接起来,但我想知道是否有一个功能可以让这一点只需一次(我知道R有这个功能,因此好奇)。 编辑:我在这里发现了一个类似的问题,但答案是字符串。可以进行多个子集设置的解决

  • 这个问题不是如何通过多个字段进行聚合,我们可以使用子聚合。 如果你知道SQL,我可以给你一个完美的解释: 我们能在Elasticsearch中实现这一点吗? 谢谢。

  • 我在elasticsearch中有一个文档索引,每个文档有480个字段。我试图做的是搜索一个词(例如“Apple”),并获得所有其值与搜索词匹配的唯一字段名。所以如果我的文档是: 作为查询的结果,我希望得到如下所示的聚合: 由于每个文档都有480个字段,所以我更喜欢执行multi_match查询,而不是使用包含所有字段的筛选器: 这个查询在ElasticSearch中可能吗?

  • 第一个名为的文档包含以下文档(不包括): 第二个集合名为,具有以下文档: 上的 预期的结果是: 如何使用聚合查询来实现这一点?

  • 我有多个数据集,每个数据集中有不同数量的图像(和不同的图像维度)。在训练循环中,我想从所有数据集中随机加载一批图像,但每个批次只包含单个数据集中的图像。例如,我有数据集A、B、C、D,每个数据集都有图像01。jpg,02。jpg,…n.jpg(其中n取决于数据集),假设批量大小为3。例如,在第一个加载的批次中,我可能会在下一个批次[D/01.jpg,D/05.jpg,D/12.jpg]中获得图像[