所以问题是在主题中。我认为我没有正确理解重新分区的工作。在我的脑海中,当我说somedataset.repartition(600)
时,我希望所有数据都将在工作人员(假设60个工作人员)之间按相等的大小进行分区。
举个例子。我会在不平衡的文件中加载大量数据,比如400个文件,其中20%的文件大小为2Gb,其他80%的文件大小约为1Mb。我有加载此数据的代码:
val source = sparkSession.read.format("com.databricks.spark.csv")
.option("header", "false")
.option("delimiter","\t")
.load(mypath)
然后,我希望将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按一些列进行分区并写入parquet。在我看来,在不同的工作人员之间平衡数据(40000个分区)似乎是合理的,然后这样做:
val ds: Dataset[FinalObject] = source.repartition(600)
.map(parse)
.filter(filter.IsValid(_))
.map(convert)
.persist(StorageLevel.DISK_ONLY)
val count = ds.count
log(count)
val partitionColumns = List("region", "year", "month", "day")
ds.repartition(partitionColumns.map(new org.apache.spark.sql.Column(_)):_*)
.write.partitionBy(partitionColumns:_*)
.format("parquet")
.mode(SaveMode.Append)
.save(destUrl)
但它失败了
ExecutorLostFailure(执行器7因其中一个正在运行的任务而退出)原因:容器因超出内存限制而被YARN终止。使用34.6 GB的34.3 GB物理内存。考虑提高spark.yarn.executor.memoryOverhead。
当我不重新分区时,一切都很好。我不明白重新分区正确吗?
对于< code>repartition和< code>partitionBy来说,您的逻辑是正确的,但是在使用< code>repartition之前,您需要记住以下几点。
请记住,对数据进行重新分区是一项相当昂贵的操作。Spark还有一个名为coalesce()的repartition()优化版本,可以避免数据移动,但前提是要减少RDD分区的数量。
如果您希望必须完成任务,请增加驱动程序和执行程序内存
我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html 它说: 重新分区:生成的DataFrame是哈希分区的 对于repartitionByRange:结果DataFrame是范围分区的 而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么
我想将数据帧“df1”划分为3列。此数据帧正好有990个针对这3列的唯一组合: 为了优化这个数据帧的处理,我想对df1进行分区,以获得990个分区,每个分区对应一个密钥: 我写了一个简单的方法来计算每个分区中的行数: 我注意到,实际上我得到的是628个带有一个或多个键值的分区,以及362个空分区。 我假设spark会以一种均匀的方式(1个键值=1个分区)重新分区,但这似乎不是这样,我觉得这种重新分
问题内容: 好吧,我试图理解并阅读可能导致它的原因,但我却无法理解: 我的代码中有这个地方: 事实是,当它尝试调用某些方法时,它将引发而不是其他预期的异常(特别是)抛出 。我实际上知道调用了什么方法,所以我直接转到该方法代码,并为应该抛出的行添加了一个块 ,它实际上按预期抛出。然而,当它上升时,以某种方式更改了上面的代码并没有 按预期进行。 是什么原因导致这种行为的?我该如何检查? 问题答案: 通
我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100
为什么我在下面的代码段中的X轴上有一个溢出? 在我的网格容器上应用时,就会产生溢出。 null null https://codepen.io/anon/pen/wdjexz?editors=1100
问题内容: 我正在使用Python 3.5,根据PEP 492 ,它应该可以访问该语法,但是在尝试使用它时却收到了SyntaxError。我究竟做错了什么? 问题答案: 没有功能就不能使用。正如文档所说: 与async def函数一起使用async是SyntaxError。 但是此代码将起作用: 或看看docs中的示例。