当前位置: 首页 > 知识库问答 >
问题:

如何在不覆盖的情况下将Spark流输出写入HDFS

景星华
2023-03-14

经过一些处理后,我得到了一个DStream[字符串,ArrayList[字符串]],所以当我使用saveAsTextFile将其写入hdfs时,每个批处理后它都会覆盖数据,所以如何通过附加到以前的结果来写入新结果

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})

编辑:: 如果有人可以帮助我将输出转换为avro格式,然后写入HDFS并附加

共有3个答案

葛兴发
2023-03-14

将流输出存储到HDFS将始终创建一个新文件,即使在您使用带拼花的append导致Namenode上出现小文件问题的情况下也是如此。我建议您将输出写入序列文件,这样您就可以继续附加到同一个文件。

鲜于玮
2023-03-14

如果您想附加相同的文件并存储在文件系统中,请将其存储为拼花文件。您可以通过

  kafkaData.foreachRDD( rdd => {
  if(rdd.count()>0)
  {
    val df=rdd.toDF()
    df.write(SaveMode.Append).save("/path")
   }
杨凌
2023-03-14

saveAsTextFile不支持append。如果使用固定的文件名调用,它每次都会覆盖它。我们可以做saveAsTextFile(路径时间戳)每次保存到一个新文件。这是DStream.saveAsTextFiles(路径)的基本功能

Parquet是一种易于访问的格式,支持追加。我们首先将数据RDD转换为数据帧或数据集,然后我们可以从该抽象之上提供的写支持中获益。

case class DataStructure(field1,..., fieldn)

... streaming setup, dstream declaration, ...

val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd => 
  import sparkSession.implicits._
  val df = rdd.toDF()
  df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")

})

请注意,随着时间的推移,附加到拼花文件的成本会越来越高,因此仍需要不时旋转目标文件。

 类似资料:
  • 问题内容: 我有两个活动,我分别从这两个活动向Firestore添加数据。但是,每当我向Firestore添加第二个活动数据时,它就会覆盖第一个活动数据。我在以下两个活动中使用了以下代码: 如何停止覆盖?我想将两个“活动”数据保存在同一文件夹中。 问题答案: 我建议您再添加一个文档或集合,以便它可以为单个用户存储多个数据值。 您可以为两个活动创建文档参考: 或者,您可以为其创建一个子集合: 更多关

  • 问题内容: 我使用pandas以以下方式写入excel文件: Masterfile.xlsx已经包含许多不同的选项卡。但是,它尚未包含“ Main”。 熊猫正确地写到“主要”表,不幸的是,它也删除了所有其他标签。 问题答案: Pandas文档说,它对xlsx文件使用openpyxl。快速浏览一下其中的代码ExcelWriter可以提示可能会发生以下情况:

  • 问题内容: 如何在文件中添加文本而不覆盖旧文本。我使用模块fs(节点js) 我试过了这段代码,但是没有用。 任何建议,谢谢。 问题答案: 在此处检查标记:http : //nodejs.org/api/fs.html#fs_fs_open_path_flags_mode_callback-您当前正在使用哪个标记: ‘w +’-打开文件进行读写。文件已创建(如果不存在)或被截断(如果存在)。 您应该

  • 问题内容: 我使用熊猫以以下方式写入excel文件: 已经包含许多不同的选项卡。但是,它还不包含。 熊猫正确地写到“主要”表,不幸的是,它也删除了所有其他标签。 问题答案: pandas文档表示,它对文件使用。快速浏览一下其中的代码r可以提示可能会发生以下情况:

  • 问题内容: 我正在编写一个将执行or 或or 的C Shell程序。他们都希望在控制台输入(TTY)而不是stdin或命令行中输入密码。 有人知道解决方案吗? 设置无密码不是一种选择。 期望可能是一个选项,但在我的精简系统中不存在。 问题答案: 对于sudo,有一个-S选项,用于接受来自标准输入的密码。这是人员条目: 这将允许您运行以下命令: 至于ssh,我已经做了很多尝试来自动化/脚本化它的用法

  • 问题内容: Netbeans仔细地将Logger.getLogger(this.getClass()。getName())。log(Level。[…]语句撒到catch块中。现在,我想将它们全部指向一个文件(并指向控制台)。 每个日志教程(只有我这样)都告诉我如何获取特定的日志记录器以输出到文件中,但是我认为有比修复每个自动生成的日志记录语句更好的方法了?为某种类型的根记录程序设置处理程序? 问题