使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2
我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中运行代码时,现有的拼花文件将被新文件覆盖。相同的查询将显示只有新数据存在,而包含拼花数据的s3文件夹只有新数据。
以下是该步骤的关键代码:
spark = SparkSession.builder \
.appName("myApp") \
.getOrCreate()
df_p = spark.read \
.format('parquet') \
.load(parquet_folder)
the_schema = df_p.schema
df2 = spark.read \
.format('com.databricks.spark.xml') \
.options(rowTag='ApplicationSubmission', \
path=input_folder) \
.schema(the_schema) \
.load(input_folder+'/*.xml')
df2.coalesce(10) \
.write \
.option('compression', 'snappy') \
.option('path', parquet_folder) \
.format('parquet') \
.mode('append') \
.saveAsTable(table_name, mode='append')
我预计这会将来自input\u文件夹的数据附加到parquet\u文件夹中的现有数据,但在EMR步骤中执行时会被覆盖。我尝试过在中不使用
mode='append'
。saveAsTable(在pyspark shell中不需要)。
建议?
我不知道为什么你的方法不起作用,但我使用<代码>得到了更好的结果。拼花地板(路径)而不是。保存表(…)
。我不知道这种行为的原因,但我以前没有见过用于保存数据对象的saveAsTable,因为它在配置单元元存储中创建了一个表(不是“物理”数据对象)。
如果您的步骤在Apache Livy中运行,那么它们的行为可能与在shell上的行为不同。如果您确实在使用Livy,您可以在齐柏林飞艇笔记本上测试您的代码,在您的代码单元格上指示您应该使用Livy-pyspark执行器运行代码。
如何使用pyarrow向拼花地板文件添加/更新? 我在文档中找不到任何关于附加拼花文件的内容。此外,您是否可以将pyarrow与多处理一起使用来插入/更新数据。
我如何一次加载5年的拼花数据并复制到一个表中?因为1个月的负荷比我1.5个小时,5年就要花我90个小时。如果有可能并行加载?我该怎么做呢? 谢谢
我是Spark的新手。我尝试在本地模式(windows)下使用spark java将csv文件保存为parquet。我得到了这个错误。 原因:org.apache.spark.Spark异常:写入行时任务失败 我引用了其他线程并禁用了spark推测 set("spark.speculation "," false ") 我还是会出错。我在csv中只使用了两个专栏进行测试。 输入: 我的代码: 请帮
则错误如下: AttributeError:“property”对象没有属性“parquet”
我们正在寻找一种解决方案,以便创建一个外部配置单元表,根据parquet/avro模式从parquet文件中读取数据。 换句话说,如何从拼花/avro模式生成hive表? 谢谢:)
我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。