当前位置: 首页 > 知识库问答 >
问题:

如何在Spark中的groupby之后运行RDD操作?

毛镜
2023-03-14

2)在控制器中执行groupby,然后在map中运行由外部库提供的非并行kmeans。

请告诉我还有别的办法,我宁愿尽可能的拥有一切。

共有1个答案

李永寿
2023-03-14

编辑:在响应的时候,我不知道它是pyspark。然而,我将把它作为一个可能被改编的想法

我有一个类似的问题,我能够提高性能,但对我来说仍然不是理想的解决方案。也许对你来说有用。

这个想法是将RDD分解成许多较小的RDDs(每个用户id一个新的RDD),将它们保存到一个数组中,然后为每个“子RDD”调用处理函数(在您的例子中是集群)。下面给出了建议的代码(注释中的解释):

// A case class just to use as example
case class MyClass(userId: Long, value: Long, ...)

// A Scala local array with the user IDs (Could be another iterator, such as List or Array):
val userList: Seq[Long] = rdd.map{ _.userId }.distinct.collect.toSeq  // Just a suggestion!

// Now we can create the new rdds:
val rddsList: Seq[RDD[MyClass]] = userList.map { 
  userId => rdd.filter({ item: MyClass => item.userId == userId }) 
}.toSeq

// Finally, we call the function we want for each RDD, saving the results in a new list. 
// Note the ".par" call, which is used to start the expensive execution for multiple RDDs at the same time
val results = rddsList.par.map {
  r => myFunction(r)
}
 类似资料:
  • 现在前3个整数是我需要广播的一些计数器。之后,所有行都具有相同的格式,如 我将在3个计数器后的所有这些值映射到一个新的RDD后,用它们在函数中做一些计算。但我无法理解如何分离前3个值,并正常映射其余值。 我的Python代码是这样的

  • 我有两个RDD说 RDD2基本上是使用范围(intial_value、end_value、间隔)生成的。这里的参数可以变化。大小可以与rdd1相同或不同。这个想法是基于使用过滤Criertia的rdd2值将记录从rdd1提取到rdd2(rdd1的记录可以在提取时重复,正如您在输出中看到的那样) 过滤条件rdd1。创建 预期产出: 现在我想根据一些使用RDD2键的条件过滤RDD1。(如上所述)并返回

  • 阅读Spark method sortByKey: 是否可能只返回“N”个数量的结果。因此,与其返回所有结果,不如返回前10名。我可以将已排序的集合转换为数组,并使用方法,但既然这是一个O(N)操作,有没有更有效的方法?

  • 我有以下RDD代表销售数据: 我试图制作一个,其中一个键是由SalesData中的一个(saleType saleDate)组成的字符串。实际上,我想过滤掉saleType saleDate相同的SalesData,然后返回,不带重复项。 我尝试了以下方法:首先将RDD映射到结构,其中键是(saleType saleDate),值是saleData。然后调用reduceByKey,选择某个键第一次

  • 问题内容: 例如,我有下表: 分组后: 我需要的是删除每个组中的行,其中列中的数量小于组中column的所有行中的最大值。好吧,我在将这个问题翻译和表达为英语时遇到了问题,因此这里是示例: 组中列中的行的最大值: 8 所以我想删除带有索引的行,并保留带有索引的行, 组中列中的行的最大值: 5 所以我想删除带有索引的行并保留带有索引的行 我尝试使用熊猫过滤器功能,但是问题是它一次在组中的所有行上运行

  • 假设我希望根据的对其进行分区。 通过覆盖方法对进行分区,并且只使用的hashcode是否正确? 但是,鉴于接受了许多分区参数,我不知道是否需要事先知道种类的数量,如果种类多于分区,会发生什么? 我的目标是打电话 并且在迭代器中只有具有相同的值。