当前位置: 首页 > 知识库问答 >
问题:

将spark数据帧写入单个拼花文件

谷梁弘深
2023-03-14

我试图做一些非常简单的事情,我有一些非常愚蠢的挣扎。我想这一定与对火花的基本误解有关。我非常感谢任何帮助或解释。

我有一张非常大的桌子(~3 TB,~300毫米行,25k个分区),在s3中保存为拼花地板,我想给一些人一个很小的拼花文件样本。不幸的是,这要花很长时间才能完成,我不明白为什么。我尝试了以下方法:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")

然后当这不起作用时,我尝试了这个,我认为应该是一样的,但我不确定。(我添加了print,以尝试调试。)

tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")

当我观看Thread UI时,打印语句和write都使用25k映射器。计数耗时3分钟,显示耗时25分钟,写入耗时约40分钟,尽管它最终写入了我想要的单文件表。

在我看来,第一行应该把前500行合并到一个分区,然后其他行应该发生得非常快(在一个映射器/减速器上)。有人能看到我在这里做错了什么吗?有人告诉我,也许我应该使用示例而不是限制,但据我所知,限制应该快得多。对吗?

提前感谢任何想法!

共有2个答案

傅乐湛
2023-03-14

试试这个,根据我的经验,重新划分对这类问题更有效:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")

如果您对拼花地板感兴趣,则无需将其保存为桌子:

tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")
堵存
2023-03-14

我将首先讨论print函数的问题,因为这是理解火花的基础。然后是限制vs示例。然后是重新分区vs合并

print函数以这种方式花费这么长时间的原因是因为coalesce是一个懒惰的转换。火花中的大多数转换都是懒惰的,直到调用了一个操作才会进行计算。

动作是做一些事情,并且(大部分)不会因此返回新的数据帧。比如计数显示。它们返回一个数字和一些数据,而coalesce返回一个带有1个分区的数据帧(某种程度上,见下文)。

发生的情况是,每次调用tiny数据帧上的操作时,都会重新运行sql查询和coalesce调用。这就是为什么他们在每次通话中都使用25k映射器。

要节省时间,请添加。cache()方法到第一行(对于打印代码)。

然后在第一行上实际执行数据帧转换,结果保存在spark节点的内存中。

这不会对第一行的初始查询时间产生任何影响,但至少不会再运行该查询两次,因为结果已被缓存,然后操作可以使用该缓存结果。

要将其从内存中删除,请使用。unpersist()方法。

现在来看看你要做的实际查询。。。

这实际上取决于数据的分区方式。比如,它是在特定的字段上划分的吗。。。

你在你的问题中提到了它,但是示例可能是正确的方法。

这是为什么呢?

limit必须搜索前500行。除非数据是按行号(或某种递增id)分区的,否则前500行可以存储在25k分区中的任何一个分区中。

所以spark必须搜索所有这些,直到找到所有正确的值。不仅如此,它还必须执行一个额外的步骤,对数据进行排序,以获得正确的顺序。

示例只抓取500个随机值。做起来容易得多,因为不需要对所涉及的数据进行顺序/排序,也不需要在特定的分区中搜索特定的行。

虽然限制可以更快,但它也有它的限制。我通常只在非常小的子集中使用它,比如10/20行。

现在是分区。。。。

我认为,coalesce的问题在于它实际上改变了分区。现在我不确定这件事,所以还是少说点盐吧。

根据pyspark文档:

这个操作会导致一个狭隘的依赖关系,例如,如果从1000个分区到100个分区,则不会出现无序排列,而是100个新分区中的每一个都会占用当前分区中的10个。

因此,实际上,500行仍然位于spark认为是1个虚拟分区的25k物理分区上。

导致混乱(通常是不好的)并用保持在spark内存中。重新分配(1)。cache()在这里可能是个好主意。因为当你写时,不需要25k映射器查看物理分区,它只会导致1个映射器查看spark内存中的内容。然后写就变得容易了。您还需要处理一个小的子集,因此(希望)任何洗牌都应该是可管理的。

显然,这通常是不好的做法,并且不能改变spark在执行原始sql查询时可能希望运行25k映射程序的事实。希望sample能解决这个问题。

编辑以澄清洗牌、重新分区合并

在一个4节点集群上,16个分区中有2个数据集。您希望加入它们,并在16个分区中作为一个新的数据集进行编写。

数据1的行1可能位于节点1上,数据2的行1可能位于节点4上。

为了将这些行连接在一起,Spark必须物理地移动其中一个或两个行,然后写入到新分区。

这是一种洗牌,在集群中物理地移动数据。

所有东西都被16分区并不重要,重要的是数据在集群上的位置。

数据。重新分区(4)将数据从每个节点的4组分区中物理地移动到每个节点的1个分区中。

Spark可能会将所有4个分区从节点1移动到其他3个节点,在这些节点上的一个新分区中,反之亦然。

我不认为它会这样做,但这是一个证明这一点的极端案例。

不过,coalesce(4)调用不会移动数据,它更聪明。相反,它承认“我已经有了每个节点4个分区

所以它不需要移动任何数据,因为它只是将现有分区合并到一个连接的分区中。

 类似资料:
  • 我有一个很大的数据框,我正在HDFS中写入拼花文件。从日志中获取以下异常: 谷歌对此进行了搜索,但找不到任何具体的解决方案。将推测设置为false:conf.Set(“spark.投机”,“false”) 但仍然没有帮助。它只完成了几个任务,生成了几个零件文件,然后突然因此错误而停止。 详细信息:Spark版本:2.3.1(这在1.6x中没有发生) 只有一个会话正在运行,这排除了不同会话访问同一位

  • 如何将数据帧中的数据写入到单个。拼花地板文件(两个数据 df.rdd.get1个分区 1个 如果我使用上述命令在HDFS中创建拼花文件,它将在HDFS中创建目录“payloads.parquet”,并在该目录中创建多个文件。拼花地板文件,元数据文件正在保存。 找到4项 如何将数据帧中的数据写入单个文件(两个数据 帮助将不胜感激。

  • 我正试图在模式下将写入文件格式(在最新的pandas版本0.21.0中引入)。但是,文件将被新数据覆盖,而不是附加到现有文件。我错过了什么? 写入语法是 读取语法是

  • 我正在使用Spark 2.3,我需要将Spark数据帧保存到csv文件中,我正在寻找更好的方法。。查看相关/类似的问题,我发现了这个问题,但我需要一个更具体的: 如果DataFrame太大,如何避免使用Pandas?因为我使用了函数(下面的代码),它产生了: 内存不足错误(无法分配内存)。 使用文件I/O直接写入csv是更好的方法吗?它可以保留分隔符吗? 使用df。聚结(1)。写选项(“标题”、“

  • 我正在使用Apache Spark生成拼花文件。我可以按日期划分它们,没有问题,但在内部我似乎无法按正确的顺序排列数据。 在处理过程中,顺序似乎丢失了,这意味着拼花地板元数据是不正确的(具体来说,我想确保拼花地板行组反映排序顺序,以便特定于我的用例的查询可以通过元数据有效地过滤)。 考虑以下示例: 使用这种方法,我确实得到了正确的拼花地板分区结构(按日期)。更好的是,对于每个日期分区,我看到一个大

  • Spark版本:2.3 hadoop dist:azure Hdinsight 2.6.5平台:azure存储:BLOB 集群中的节点:6个执行器实例:每个执行器6个内核:每个执行器3个内存:8gb 试图通过同一存储帐户上的spark数据框将azure blob(wasb)中的csv文件(大小4.5g-280列,2.8 mil行)加载到拼花格式。我重新划分了大小不同的文件,即20、40、60、10