当前位置: 首页 > 知识库问答 >
问题:

过滤和合并时避免重新分配成本

易宣
2023-03-14

我正在 pyspark 中对 (x,y) 点的 RDD 实现范围查询。我将 xy 空间划分为一个 16*16 的网格(256 个单元格),并将 RDD 中的每个点分配给其中一个单元格。gridMappedRDD 是一个 PairRDD:(cell_id,Point 对象

我将这个RDD分区为256个分区,使用:

gridMappedRDD.partitionBy(256)

范围查询是一个矩形框。我为我的网格对象准备了一个方法,它可以返回与查询范围重叠的单元格id列表。所以,我用这个作为过滤器来删除不相关的单元格:

filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)

但问题是,在运行查询然后收集结果时,会评估所有 256 个分区;为每个分区创建一个任务。

为了避免这个问题,我尝试将过滤后的RDD合并为候选单元格列表的长度,我希望这可以解决问题。

filteredRDD.coalesce(len(candidateCells))

事实上,生成的RDD具有< code>len(candidateCells)分区,但这些分区与< code>gridMappedRDD不同。

正如在coalesce留档中所述,schffle参数是False,不应该在分区之间执行随机切换,但我可以看到(在Glom()的帮助下)情况并非如此。

例如,在candidateCells=[62,63,78,79]合并(4)之后,分区如下:

[[(62, P), (62, P) .... , (63, P)],
 [(78, P), (78, P) .... , (79, P)],
 [], []
]

实际上,通过合并,我有一个随机读取,它等于我的每个任务的整个数据集的大小,这需要很长时间。我需要的是一个RDD,只有与candidateCells中的单元格相关的分区,没有任何洗牌。所以,我的问题是,有没有可能只过滤一些分区而不重新洗牌?对于上面的例子,我的filteredRDD将有4个分区,其数据与originalRDD的62、63、78、79个分区完全相同。这样做,查询可以只针对受影响的分区。

共有1个答案

刘棋
2023-03-14

您在这里做出了一些不正确的假设:

  • 洗牌与合并无关(合并在这里也没有用)。它是由分区引起的。根据定义,分区需要随机播放。
  • 分区不能用于优化筛选器。Spark对你使用的功能一无所知(它是一个黑匣子)。
  • 分区
  • 不会唯一地将键映射到分区。多个键可以放在同一个分区上 - HashPartitioner 如何工作?

你能做什么:

>

  • 如果结果子集是小的重新分区,并对每个键应用查找

    from itertools import chain
    
    partitionedRDD = gridMappedRDD.partitionBy(256)
    
    chain.from_iterable(
        ((c, x) for x in partitionedRDD.lookup(c)) 
        for c in candidateCells
    )
    

    如果数据很大,您可以尝试跳过扫描分区(任务数量不会改变,但某些任务可能会短路):

    candidatePartitions = [
        partitionedRDD.partitioner.partitionFunc(c) for c in candidateCells
    ]
    
    partitionedRDD.mapPartitionsWithIndex(
        lambda i, xs: (x for x in xs if x[0] in candidateCells) if i in candidatePartitions else []
    )
    

    这两种方法只有在执行多次“查找”时才有意义。如果是一次性操作,最好执行线性滤波:

    • 它比随机和重新分区便宜。
    • 如果初始数据均匀分布,则下游处理将能够更好地利用可用资源。

  •  类似资料:
    • 我有一个新手火花问题。下面是我试图执行的程序: 我试图做的计算(doComput)是昂贵的,因为内存限制,我决定将数据集重新分区为7000个分区(即使我有1200个执行器,所以一次只能执行1200个)。然而,在完成计算后,我尝试写入s3,它大部分工作正常,但很少有任务最终超时,工作被重试。 1) 为什么在我保存在进行昂贵计算后生成的RDD时,整个作业都会被重试? 2) 我试图在持久化之后合并,但s

    • 问题内容: 我正在尝试在两个数据帧之间合并。每个数据帧都有两个索引级别(日期,客户)。在列中,例如,某些列在两者之间匹配(货币,日期)。 按索引合并这些内容的最佳方法是什么,但不要采用两个副本的货币和日期。 每个数据框都是90列,所以我试图避免用手将所有内容写出来。 如果我做: 我懂了 谢谢!… 问题答案: 您可以算出仅在一个DataFrame中的列,并使用它来选择合并中列的子集。 然后执行合并(

    • 引用https://cloud.google.com/load-balancing/docs/https/setting-up-https-serverless#enableing 虽然Google Cloud Armor可以配置为具有云运行(完全管理)、云功能和App Engine后端的后端服务,但这种功能有一定的限制,尤其是云运行(完全管理)和App Engine。有权访问Google Clo

    • 我有: 我尝试使用map()和filter()实现myFunction,但无法使其正常工作。如果我有接近的想法,我会展示一些我尝试过的代码,不幸的是,情况并非如此。任何帮助都将不胜感激!

    • 为了更新一些图像,我使用了“docker-compose pull”。然后我构建:'docker-compose build'。 下次我怎么避免这个?

    • 我不断遇到需要通过映射或集合保存状态的解决方案。e、 g.创建一个返回在输入中找到的重复项的方法 我的Java8流解决方案,不幸的是,我正在使用哈希集进行过滤。我理解这并不“恰当”,因为这取决于州。没有州是建议还是硬性规定?这只是运行并行流时的问题吗?有人能推荐一种不使用哈希集的方法吗?