我试图做一些非常简单的事情,我有一些非常愚蠢的挣扎。我想这一定与对火花的基本误解有关。我非常感谢任何帮助或解释。
我有一张非常大的桌子(~3 TB,~300毫米行,25k个分区),在s3中保存为拼花地板,我想给一些人一个很小的拼花文件样本。不幸的是,这要花很长时间才能完成,我不明白为什么。我尝试了以下方法:
tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.coalesce(1).write.saveAsTable("db.tiny_table")
然后当这不起作用时,我尝试了这个,我认为应该是一样的,但我不确定。(我添加了print
,以尝试调试。)
tiny = spark.table("db.big_table").limit(500).coalesce(1)
print(tiny.count())
print(tiny.show(10))
tiny.write.saveAsTable("db.tiny_table")
当我观看Thread UI时,打印语句和write
都使用25k映射器。计数
耗时3分钟,显示
耗时25分钟,写入
耗时约40分钟,尽管它最终写入了我想要的单文件表。
在我看来,第一行应该把前500行合并到一个分区,然后其他行应该发生得非常快(在一个映射器/减速器上)。有人能看到我在这里做错了什么吗?有人告诉我,也许我应该使用示例
而不是限制
,但据我所知,限制
应该快得多。对吗?
提前感谢任何想法!
试试这个,根据我的经验,重新划分对这类问题更有效:
tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.saveAsTable("db.tiny_table")
如果您对拼花地板感兴趣,则无需将其保存为桌子:
tiny = spark.sql("SELECT * FROM db.big_table LIMIT 500")
tiny.repartition(1).write.parquet(your_hdfs_path+"db.tiny_table")
我将首先讨论print
函数的问题,因为这是理解火花的基础。然后是限制
vs示例
。然后是重新分区
vs合并
。
print
函数以这种方式花费这么长时间的原因是因为coalesce
是一个懒惰的转换。火花中的大多数转换都是懒惰的,直到调用了一个操作才会进行计算。
动作是做一些事情,并且(大部分)不会因此返回新的数据帧。比如计数
,显示
。它们返回一个数字和一些数据,而coalesce
返回一个带有1个分区的数据帧(某种程度上,见下文)。
发生的情况是,每次调用tiny
数据帧上的操作时,都会重新运行sql查询和coalesce
调用。这就是为什么他们在每次通话中都使用25k映射器。
要节省时间,请添加。cache()
方法到第一行(对于打印
代码)。
然后在第一行上实际执行数据帧转换,结果保存在spark节点的内存中。
这不会对第一行的初始查询时间产生任何影响,但至少不会再运行该查询两次,因为结果已被缓存,然后操作可以使用该缓存结果。
要将其从内存中删除,请使用。unpersist()
方法。
现在来看看你要做的实际查询。。。
这实际上取决于数据的分区方式。比如,它是在特定的字段上划分的吗。。。
你在你的问题中提到了它,但是示例可能是正确的方法。
这是为什么呢?
limit
必须搜索前500行。除非数据是按行号(或某种递增id)分区的,否则前500行可以存储在25k分区中的任何一个分区中。
所以spark必须搜索所有这些,直到找到所有正确的值。不仅如此,它还必须执行一个额外的步骤,对数据进行排序,以获得正确的顺序。
示例只抓取500个随机值。做起来容易得多,因为不需要对所涉及的数据进行顺序/排序,也不需要在特定的分区中搜索特定的行。
虽然限制可以更快,但它也有它的限制。我通常只在非常小的子集中使用它,比如10/20行。
现在是分区。。。。
我认为,coalesce
的问题在于它实际上改变了分区。现在我不确定这件事,所以还是少说点盐吧。
根据pyspark
文档:
这个操作会导致一个狭隘的依赖关系,例如,如果从1000个分区到100个分区,则不会出现无序排列,而是100个新分区中的每一个都会占用当前分区中的10个。
因此,实际上,500行仍然位于spark认为是1个虚拟分区的25k物理分区上。
导致混乱(通常是不好的)并用保持在spark内存中。重新分配(1)。cache()
在这里可能是个好主意。因为当你写时,不需要25k映射器查看物理分区,它只会导致1个映射器查看spark内存中的内容。然后写就变得容易了。您还需要处理一个小的子集,因此(希望)任何洗牌都应该是可管理的。
显然,这通常是不好的做法,并且不能改变spark在执行原始sql查询时可能希望运行25k映射程序的事实。希望
sample
能解决这个问题。
编辑以澄清洗牌、
重新分区
和合并
在一个4节点集群上,16个分区中有2个数据集。您希望加入它们,并在16个分区中作为一个新的数据集进行编写。
数据1的行1可能位于节点1上,数据2的行1可能位于节点4上。
为了将这些行连接在一起,Spark必须物理地移动其中一个或两个行,然后写入到新分区。
这是一种洗牌,在集群中物理地移动数据。
所有东西都被16分区并不重要,重要的是数据在集群上的位置。
数据。重新分区(4)
将数据从每个节点的4组分区中物理地移动到每个节点的1个分区中。
Spark可能会将所有4个分区从节点1移动到其他3个节点,在这些节点上的一个新分区中,反之亦然。
我不认为它会这样做,但这是一个证明这一点的极端案例。
不过,
coalesce(4)
调用不会移动数据,它更聪明。相反,它承认“我已经有了每个节点4个分区
所以它不需要移动任何数据,因为它只是将现有分区合并到一个连接的分区中。
我有一个很大的数据框,我正在HDFS中写入拼花文件。从日志中获取以下异常: 谷歌对此进行了搜索,但找不到任何具体的解决方案。将推测设置为false:conf.Set(“spark.投机”,“false”) 但仍然没有帮助。它只完成了几个任务,生成了几个零件文件,然后突然因此错误而停止。 详细信息:Spark版本:2.3.1(这在1.6x中没有发生) 只有一个会话正在运行,这排除了不同会话访问同一位
如何将数据帧中的数据写入到单个。拼花地板文件(两个数据 df.rdd.get1个分区 1个 如果我使用上述命令在HDFS中创建拼花文件,它将在HDFS中创建目录“payloads.parquet”,并在该目录中创建多个文件。拼花地板文件,元数据文件正在保存。 找到4项 如何将数据帧中的数据写入单个文件(两个数据 帮助将不胜感激。
我正试图在模式下将写入文件格式(在最新的pandas版本0.21.0中引入)。但是,文件将被新数据覆盖,而不是附加到现有文件。我错过了什么? 写入语法是 读取语法是
我正在使用Spark 2.3,我需要将Spark数据帧保存到csv文件中,我正在寻找更好的方法。。查看相关/类似的问题,我发现了这个问题,但我需要一个更具体的: 如果DataFrame太大,如何避免使用Pandas?因为我使用了函数(下面的代码),它产生了: 内存不足错误(无法分配内存)。 使用文件I/O直接写入csv是更好的方法吗?它可以保留分隔符吗? 使用df。聚结(1)。写选项(“标题”、“
我正在使用Apache Spark生成拼花文件。我可以按日期划分它们,没有问题,但在内部我似乎无法按正确的顺序排列数据。 在处理过程中,顺序似乎丢失了,这意味着拼花地板元数据是不正确的(具体来说,我想确保拼花地板行组反映排序顺序,以便特定于我的用例的查询可以通过元数据有效地过滤)。 考虑以下示例: 使用这种方法,我确实得到了正确的拼花地板分区结构(按日期)。更好的是,对于每个日期分区,我看到一个大
Spark版本:2.3 hadoop dist:azure Hdinsight 2.6.5平台:azure存储:BLOB 集群中的节点:6个执行器实例:每个执行器6个内核:每个执行器3个内存:8gb 试图通过同一存储帐户上的spark数据框将azure blob(wasb)中的csv文件(大小4.5g-280列,2.8 mil行)加载到拼花格式。我重新划分了大小不同的文件,即20、40、60、10