当前位置: 首页 > 知识库问答 >
问题:

附加到拼花文件的EMR Spark步骤正在覆盖拼花文件

费和惬
2023-03-14

使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2

我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中运行代码时,现有的拼花文件将被新文件覆盖。相同的查询将显示只有新数据存在,而包含拼花数据的s3文件夹只有新数据。

以下是该步骤的关键代码:

    spark = SparkSession.builder \
                        .appName("myApp") \
                        .getOrCreate()

    df_p = spark.read \
                .format('parquet') \
                .load(parquet_folder)

    the_schema = df_p.schema

    df2 = spark.read \
               .format('com.databricks.spark.xml') \
               .options(rowTag='ApplicationSubmission', \
                        path=input_folder) \
               .schema(the_schema) \
               .load(input_folder+'/*.xml')

    df2.coalesce(10) \
       .write \
       .option('compression', 'snappy') \
       .option('path', parquet_folder) \
       .format('parquet') \
       .mode('append') \
       .saveAsTable(table_name, mode='append')

我预计这会将来自input\u文件夹的数据附加到parquet\u文件夹中的现有数据,但在EMR步骤中执行时会被覆盖。我尝试过在中不使用mode='append'。saveAsTable(在pyspark shell中不需要)。

建议?


共有1个答案

司业
2023-03-14
匿名用户

我不知道为什么你的方法不起作用,但我使用<代码>得到了更好的结果。拼花地板(路径)而不是。保存表(…) 。我不知道这种行为的原因,但我以前没有见过用于保存数据对象的saveAsTable,因为它在配置单元元存储中创建了一个表(不是“物理”数据对象)。

如果您的步骤在Apache Livy中运行,那么它们的行为可能与在shell上的行为不同。如果您确实在使用Livy,您可以在齐柏林飞艇笔记本上测试您的代码,在您的代码单元格上指示您应该使用Livy-pyspark执行器运行代码。

 类似资料:
  • 如何使用pyarrow向拼花地板文件添加/更新? 我在文档中找不到任何关于附加拼花文件的内容。此外,您是否可以将pyarrow与多处理一起使用来插入/更新数据。

  • 我如何一次加载5年的拼花数据并复制到一个表中?因为1个月的负荷比我1.5个小时,5年就要花我90个小时。如果有可能并行加载?我该怎么做呢? 谢谢

  • 我是Spark的新手。我尝试在本地模式(windows)下使用spark java将csv文件保存为parquet。我得到了这个错误。 原因:org.apache.spark.Spark异常:写入行时任务失败 我引用了其他线程并禁用了spark推测 set("spark.speculation "," false ") 我还是会出错。我在csv中只使用了两个专栏进行测试。 输入: 我的代码: 请帮

  • 则错误如下: AttributeError:“property”对象没有属性“parquet”

  • 我们正在寻找一种解决方案,以便创建一个外部配置单元表,根据parquet/avro模式从parquet文件中读取数据。 换句话说,如何从拼花/avro模式生成hive表? 谢谢:)

  • 我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。