当前位置: 首页 > 知识库问答 >
问题:

如何在HDFS中附加到相同的文件(火花2.11)

钮善
2023-03-14

我正在尝试使用SparkStreaming将流数据存储到HDFS中,但它会继续在新文件中创建附加到一个文件或几个多个文件中

如果它一直创建n个文件,我觉得效率不会很高

代码

lines.foreachRDD(f => {
  if (!f.isEmpty()) {
    val df = f.toDF().coalesce(1)
    df.write.mode(SaveMode.Append).json("hdfs://localhost:9000/MT9")
  }
 })

在我的pom中,我使用了各自的依赖项:

  • 火花-core_2.11
  • 火花-sql_2.11
  • 火花-streaming_2.11
  • 火花流-kafka-0-10_2.11

共有2个答案

上官彬
2023-03-14

每次重新初始化DataFrame变量时,它都会为每个rdd创建文件。我建议使用一个DataFrame变量,并在循环外部和每个rdd联合内部与本地DataFrame赋值为null。在使用外部数据帧进行循环写入之后。

殳经略
2023-03-14

正如您已经意识到的,Spark中的Append意味着写入现有目录而不是追加到文件。

这是有意的和期望的行为(想想如果进程在“附加”过程中失败会发生什么,即使格式和文件系统允许)。

如有必要,合并文件等操作应由单独的进程应用,以确保正确性和容错性。不幸的是,这需要完整的副本,出于明显的原因,不希望在批次到批次的基础上进行。

 类似资料:
  • 我有一个c#应用程序,可以创建拼花地板文件并将其上载到远程HDFS。如果我使用scp将文件复制到安装了HDFS客户端的目标计算机上,然后将文件“HDFS放入”HDFS中,spark可以正确读取文件。 如果我使用curl针对webhdf服务从客户端应用程序直接将文件上传到HDFS,则在尝试读取拼花文件时会从Spark收到以下错误: df=sqlContext。阅读parquet(“/tmp/test

  • 如何使用pyarrow向拼花地板文件添加/更新? 我在文档中找不到任何关于附加拼花文件的内容。此外,您是否可以将pyarrow与多处理一起使用来插入/更新数据。

  • 使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2 我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中

  • 问题内容: 我正在尝试 将 字符串 追加 到日志文件。但是,writeFile每次写入字符串之前都会擦除内容。 任何想法如何以简单的方式做到这一点? 问题答案: 对于偶尔的追加,您可以使用,每次调用时都会创建一个新的文件句柄: 异步地: 同步:

  • 问题内容: 我正在写某种生成加密日志文件的记录器。不幸的是,密码学不是我的强项。现在,我可以向文件写入几条消息,然后关闭文件。然后我可以打开它,附加一些消息,再次关闭,解密后,我在文件中间看到填充字节。有什么方法可以处理加密文件,而不必每次我想添加一些消息时都对其解密? 编辑 :更多细节。当前实现使用CipherOutputStream。据我了解,没有办法 寻求 使用它。如果我将控制输出数据大小可

  • 我只找到TextInputFormat和CsvInputFormat。那么,如何使用ApacheFlink读取HDFS中的拼花文件呢?