我正在与AWS工作,我有使用Spark和Hive的工作流。我的数据是按日期分区的,所以每天我的S3存储中都有一个新分区。我的问题是,当有一天加载数据失败时,我不得不重新执行那个分区。接下来编写的代码是:
df // My data in a Dataframe
.write
.format(getFormat(target)) // csv by default, but could be parquet, ORC...
.mode(getSaveMode("overwrite")) // Append by default, but in future it should be Overwrite
.partitionBy(partitionName) // Column of the partition, the date
.options(target.options) // header, separator...
.option("path", target.path) // the path where it will be storage
.saveAsTable(target.tableName) // the table name
在我的流动中发生了什么?如果我使用savemode.overwrite,完整的表将被删除,并且只保存分区。如果我使用savemode.append,我可能会有重复的数据。
| A | B | C |
|---|---|---|
| b | 1 | 2 |
| c | 1 | 2 |
| A | B | C |
|---|---|---|
| a | 1 | 2 |
| b | 5 | 2 |
我想要的是:在表中,分区A
留在表中,分区B
用数据覆盖,并添加分区C
。有没有任何解决方案使用火花,我可以做到这一点?
我的最后一个选择是首先删除要保存的分区,然后使用savemode.append,但如果没有其他解决方案,我会尝试这样做。
如果您使用的是Spark2.3.0,请尝试将Spark.sql.sources.PartitionOverwriteMode
设置设置为Dynamic
,则需要对数据集进行分区,并覆盖写模式。
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")
非常感谢任何帮助。
我使用,但这给我带来了partitionBy和intsertInto不能同时使用的问题。
我正在编写一个将HBASE-0.98.19与HIVE-1.2.1集成的示例。我已经使用以下命令在hbase中创建了一个表 然后创建了'testemp'用于将数据导入到'hbase_table_emp'。下面的代码显示了创建'testemp'表的方法 到现在,一切正常。但当我运行命令时 ps:类路径中包含了hbase.jar、zookeeper.jar和guava.jar。 提前道谢。
我正在编写一个spark应用程序,并使用sbt assembly创建一个fat jar,我可以将其发送到spark-submit(通过Amazon EMR)。我的应用程序使用typesafe-config,在我的目录中有一个文件。我的jar在Amazon S3上,我使用命令创建一个新的spark作业(将jar下载到集群并发送到spark-submit)。我知道,一般来说,我可以使用来覆盖这些设置。
我设置了一个AWS EMR集群,其中包括Spark 2.3.2、hive 2.3.3和hbase 1.4.7。如何配置Spark以访问hive表? 我采取了以下步骤,但结果是错误消息: Java语言lang.ClassNotFoundException:java。lang.NoClassDefFoundError:org/apache/tez/dag/api/SessionNotRunning使用