我正在 pyspark 中对 (x,y) 点的 RDD 实现范围查询。我将 xy 空间划分为一个 16*16 的网格(256 个单元格),并将 RDD 中的每个点分配给其中一个单元格。gridMappedRDD 是一个 PairRDD:(cell_id,Point 对象)
我将这个RDD分区为256个分区,使用:
gridMappedRDD.partitionBy(256)
范围查询是一个矩形框。我为我的网格对象准备了一个方法,它可以返回与查询范围重叠的单元格id列表。所以,我用这个作为过滤器来删除不相关的单元格:
filteredRDD = gridMappedRDD.filter(lambda x: x[0] in candidateCells)
但问题是,在运行查询然后收集结果时,会评估所有 256 个分区;为每个分区创建一个任务。
为了避免这个问题,我尝试将过滤后的RDD合并为候选单元格列表的长度,我希望这可以解决问题。
filteredRDD.coalesce(len(candidateCells))
事实上,生成的RDD具有< code>len(candidateCells)分区,但这些分区与< code>gridMappedRDD不同。
正如在coalesce留档中所述,schffle
参数是False,不应该在分区之间执行随机切换,但我可以看到(在Glom()的帮助下)情况并非如此。
例如,在与
之后,分区如下:candidateCells=[62,63,78,79]
合并(4)
[[(62, P), (62, P) .... , (63, P)],
[(78, P), (78, P) .... , (79, P)],
[], []
]
实际上,通过合并,我有一个随机读取,它等于我的每个任务的整个数据集的大小,这需要很长时间。我需要的是一个RDD,只有与candidateCells中的单元格相关的分区,没有任何洗牌。所以,我的问题是,有没有可能只过滤一些分区而不重新洗牌?对于上面的例子,我的filteredRDD将有4个分区,其数据与originalRDD的62、63、78、79个分区完全相同。这样做,查询可以只针对受影响的分区。
您在这里做出了一些不正确的假设:
合并
无关(合并
在这里也没有用)。它是由分区引起的
。根据定义,分区需要随机播放。筛选器
。Spark对你使用的功能一无所知(它是一个黑匣子)。你能做什么:
>
如果结果子集是小的重新分区,并对每个键应用查找
:
from itertools import chain
partitionedRDD = gridMappedRDD.partitionBy(256)
chain.from_iterable(
((c, x) for x in partitionedRDD.lookup(c))
for c in candidateCells
)
如果数据很大,您可以尝试跳过扫描分区(任务数量不会改变,但某些任务可能会短路):
candidatePartitions = [
partitionedRDD.partitioner.partitionFunc(c) for c in candidateCells
]
partitionedRDD.mapPartitionsWithIndex(
lambda i, xs: (x for x in xs if x[0] in candidateCells) if i in candidatePartitions else []
)
这两种方法只有在执行多次“查找”时才有意义。如果是一次性操作,最好执行线性滤波:
我有一个新手火花问题。下面是我试图执行的程序: 我试图做的计算(doComput)是昂贵的,因为内存限制,我决定将数据集重新分区为7000个分区(即使我有1200个执行器,所以一次只能执行1200个)。然而,在完成计算后,我尝试写入s3,它大部分工作正常,但很少有任务最终超时,工作被重试。 1) 为什么在我保存在进行昂贵计算后生成的RDD时,整个作业都会被重试? 2) 我试图在持久化之后合并,但s
问题内容: 我正在尝试在两个数据帧之间合并。每个数据帧都有两个索引级别(日期,客户)。在列中,例如,某些列在两者之间匹配(货币,日期)。 按索引合并这些内容的最佳方法是什么,但不要采用两个副本的货币和日期。 每个数据框都是90列,所以我试图避免用手将所有内容写出来。 如果我做: 我懂了 谢谢!… 问题答案: 您可以算出仅在一个DataFrame中的列,并使用它来选择合并中列的子集。 然后执行合并(
引用https://cloud.google.com/load-balancing/docs/https/setting-up-https-serverless#enableing 虽然Google Cloud Armor可以配置为具有云运行(完全管理)、云功能和App Engine后端的后端服务,但这种功能有一定的限制,尤其是云运行(完全管理)和App Engine。有权访问Google Clo
我有: 我尝试使用map()和filter()实现myFunction,但无法使其正常工作。如果我有接近的想法,我会展示一些我尝试过的代码,不幸的是,情况并非如此。任何帮助都将不胜感激!
为了更新一些图像,我使用了“docker-compose pull”。然后我构建:'docker-compose build'。 下次我怎么避免这个?
我不断遇到需要通过映射或集合保存状态的解决方案。e、 g.创建一个返回在输入中找到的重复项的方法 我的Java8流解决方案,不幸的是,我正在使用哈希集进行过滤。我理解这并不“恰当”,因为这取决于州。没有州是建议还是硬性规定?这只是运行并行流时的问题吗?有人能推荐一种不使用哈希集的方法吗?