当前位置: 首页 > 知识库问答 >
问题:

将新数据追加到已分区的拼花文件中

缪英锐
2023-03-14

我正在编写一个ETL进程,在该进程中,我需要每小时读取日志文件,对数据进行分区,并保存它。我正在使用Spark(在数据库中)。日志文件是CSV,所以我读取它们并应用模式,然后执行转换。

我的问题是,如何将每个小时的数据保存为拼花板格式,但添加到现有的数据集?保存时,我需要按DataFrame中存在的4列进行分区。

下面是我的保存行:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

我使用parquet是因为分区大大增加了以后的查询。此外,我必须将数据以某种文件格式写入磁盘,不能使用Druid或Cassandra这样的数据库。

对于如何分区我的dataframe和保存文件(要么坚持parquet格式,要么采用其他格式)的任何建议都非常感谢。

共有1个答案

潘宝
2023-03-14

如果您需要追加文件,您肯定必须使用追加模式。我不知道您希望它生成多少个分区,但我发现如果您有许多分区,PartitionBy将导致许多问题(内存和IO问题)。

如果您认为您的问题是由于写操作耗时太长造成的,我建议您尝试以下两种方法:

1)通过在配置中添加以下内容来使用snappy:

conf.set("spark.sql.parquet.compression.codec", "snappy")

2)禁用在SparkContext上的HadoopConfiguration中生成元数据文件,如下所示:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

元数据文件的生成有点耗时(请参阅这篇博文),但根据这篇博文,它们实际上并不重要。就我个人而言,我总是禁用它们,没有问题。

如果您生成了许多分区(>500),恐怕我所能做的最好的事情就是建议您研究一个不使用append-mode的解决方案--我只是从来没有设法通过partitionby来使用如此多的分区。

 类似资料:
  • 如何读取带有条件作为数据帧的分区镶木地板, 这工作得很好, 分区存在的时间为< code>day=1到day=30是否可能读取类似于< code>(day = 5到6)或< code>day=5,day=6的内容, 如果我输入< code>*,它会给出所有30天的数据,而且太大了。

  • 问题内容: 我想计算一些列数据并将其作为列写入文件。然后,在计算了另一列数据之后,我想将其附加到同一文件中,但作为新列。 这是我所做的: 结果-它将新列追加到第一列下方,因此我只有一个长列。 谢谢, 问题答案: 您将必须逐行读取文件,然后将新列插入每一行。这是使用BufferedReader和BufferedWriter的解决方案

  • 我已经使用Spark生成了一些分区拼花地板数据,我想知道如何将其映射到Impala表。。。遗憾的是,我还没有找到任何解决办法。 拼花地板的架构如下: 我用和对其进行了分区,这为我的hdfs提供了这种目录: 您知道我如何告诉Impala从这个数据集创建一个具有相应分区的表(并且不必像我读到的那样在每个分区上循环)?有可能吗? 提前谢谢你

  • 使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2 我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中

  • 我试图利用火花分区。我试图做这样的事情 这里的问题每个分区都会创建大量的镶木地板文件,如果我尝试从根目录读取,则会导致读取缓慢。 为了避免这种情况,我试过 但是,这将创建每个分区中镶木地板文件的数目。现在我的分区大小不同了。因此,理想情况下,我希望每个分区都有单独的合并。然而,这看起来并不容易。我需要访问所有分区合并到一定数量并存储在单独的位置。 我应该如何使用分区来避免写入后出现许多文件?

  • 我有5个表存储为CSV文件(A.CSV、B.CSV、C.CSV、D.CSV、E.CSV)。每个文件按日期分区。如果文件夹结构如下: