我正在从s3读取csv文件,并以ORC的身份写入配置单元表。在写的同时,它也在写大量的小文件。我需要合并所有这些文件。我设置了以下属性:
spark.sql("SET hive.merge.sparkfiles = true")
spark.sql("SET hive.merge.mapredfiles = true")
spark.sql("SET hive.merge.mapfiles = true")
spark.sql("set hive.merge.smallfiles.avgsize = 128000000")
spark.sql("set hive.merge.size.per.task = 128000000")
除了这些配置之外,我尝试了repartition(1)和coalesce(1),这将合并到单个文件中,但它会删除配置单元表并重新创建它。
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);
如果我使用追加模式而不是覆盖模式,它会在每个分区下创建重复文件。
masterFile.repartition(1).write.mode(SaveMode.Append).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>);
在这两种情况下,spark作业运行两次,在第二次执行时失败。
有没有任何方法可以使用附加模式的重新分区/合并,而不重复每个分区中的部分文件?
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).orc(<HIVEtablePath>)
.orc()方法将数据作为文件写入,不触及元信息。因此无法覆盖配置单元中的表。
如果要覆盖配置单元表中数据,请使用method.insertinto(hive_table_name),其中hive_table_name是配置单元中表的全名(模式+表名)
根据你的例子
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).insertInto(hiveTableName)
还可以用元数据信息覆盖数据。具有覆盖修饰符的saveastable(hive_table_name)也将覆盖转移的数据。
masterFile.repartition(1).write.mode(SaveMode.Overwrite).partitionBy(<partitioncolumn>).saveAsTable(hiveTableName)
我有一个包含呼叫数据记录的配置单元表。我在电话号码上对表进行了分区,并在CALL_DATE上对表进行了bucked处理。现在,当我在hive中插入数据时,过时的call_date会在我的bucket中创建小文件,这会创建名称、节点、元数据、增加和性能降低。有没有办法把这些小文件合二为一。
问题内容: SO和Web上的大多数问题/答案都讨论了如何使用Hive将一堆小的ORC文件组合成一个更大的文件,但是,我的ORC文件是日志文件,每天都分开,因此我需要将它们分开。我只想每天“汇总” ORC文件(它们是HDFS中的目录)。 我最有可能需要用Java编写解决方案,并且遇到过OrcFileMergeOperator,这可能是我需要使用的内容,但还为时过早。 解决此问题的最佳方法是什么? 问
我有一个超过10亿行的DataFrame(df) 从上面的命令中,我了解到我的100个工作节点集群(spark 2.4.5)中只有5个工作节点将执行所有任务。使用聚结剂(5)需要7小时才能完成。 我应该尝试< code >重新分区而不是< code >联合? 有没有一种更快速/高效的方法来写出128 MB大小的拼花文件,或者我需要首先计算数据帧的大小来确定需要多少分区。 例如,如果我的数据帧大小为
我在我的项目中使用spack-sql-2.3.1v、kafka和java8。与 在消费者方面,我尝试使用下面的代码在hdfs me中编写文件 当我存储到hdfs文件夹中时,它看起来像下面的东西,即每个文件都在1.5k即几个KB。 由于这个小文件,它需要大量的处理时间,而我从hdfs中读取更大的数据集 问题: > 如果我想计算给定hdfs文件夹中的记录总数,如何计算? 新更改后 运行成功结果包括:
问题内容: 我有一个结果RDD 。输出格式如下: 我想要的是创建一个CSV文件,其中一列用于(上面输出中的元组的第一部分),另一列用于(元组输出的第二部分)。但我不知道如何使用Python在Spark中写入CSV文件。 如何使用上述输出创建CSV文件? 问题答案: 然后只需将RDD()的行转换为字符串(CSV的行)即可。
使用火花流读取和处理来自Kafka的消息并写入HDFS-Hive。由于我希望避免创建许多垃圾文件系统的小文件,我想知道是否有办法确保最小的文件大小,和/或强制在文件中输出行数最少的能力,超时除外。谢谢。