当前位置: 首页 > 知识库问答 >
问题:

在spark流上下文中将RDD写入HDFS

傅星光
2023-03-14

我有一个火花1.2.0的火花流环境,我从本地文件夹中检索数据,每次我发现一个新文件添加到文件夹中时,我都会执行一些转换。

val ssc = new StreamingContext(sc, Seconds(10))
val data = ssc.textFileStream(directory)

为了对DStream数据执行分析,我必须将其转换为数组

var arr = new ArrayBuffer[String]();
   data.foreachRDD {
   arr ++= _.collect()
}

然后,我使用获得的数据提取我想要的信息,并将其保存在HDFS上。

val myRDD  = sc.parallelize(arr)
myRDD.saveAsTextFile("hdfs directory....")

由于我真的需要使用Array操作数据,因此不可能使用DStream.saveAsTextFiles("...")(这将正常工作)在HDFS上保存数据,我必须保存RDD,但使用此先决条件,我终于有了名为part-00000等的空输出文件...

使用arr.foreach(println)我能够看到传输的正确结果。

我的怀疑是火花在每个批次中尝试将数据写入相同的文件,删除之前写入的内容。我试图保存在一个动态命名文件夹中,如myRDD. saveAsTextFile("文件夹"System.currentTimeMillis(). toString()),但总是只创建一个文件夹,输出文件仍然为空。

如何在spark流上下文中将RDD写入HDFS?

共有2个答案

南门朗
2023-03-14

@vzamboni:Spark 1.5 dataframes api具有以下功能:

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).partitionBy("parameter1", "parameter2").save(path);
韩阳飙
2023-03-14

您使用的Spark Streaming的方式不是设计的。我建议在您的用例中放弃使用Spark,或者修改您的代码,使其以Spark的方式工作。将阵列收集到驱动程序违背了使用分布式引擎的目的,并使您的应用程序有效地成为一台机器(两台机器也会导致比仅在一台机器上处理数据更大的开销)。

您可以用数组做的一切,都可以用Spark来做。因此,只需在流中运行您的计算,分布在辅助角色上,然后使用DStream.saveAsTextFiles()编写输出。您可以使用foreachRDDsaveAsParque(path, overwrite=true)写入单个文件。

 类似资料:
  • 问题内容: 我有一个结果RDD 。输出格式如下: 我想要的是创建一个CSV文件,其中一列用于(上面输出中的元组的第一部分),另一列用于(元组输出的第二部分)。但我不知道如何使用Python在Spark中写入CSV文件。 如何使用上述输出创建CSV文件? 问题答案: 然后只需将RDD()的行转换为字符串(CSV的行)即可。

  • 问题内容: 我正在探索Spark进行批处理。我正在使用独立模式在本地计算机上运行spark。 我正在尝试使用saveTextFile()方法将Spark RDD转换为单个文件[最终输出],但无法正常工作。 例如,如果我有多个分区,我们如何获得一个文件作为最终输出。 更新: 我尝试了以下方法,但是我得到了空指针异常。 例外是: 此致Shankar 问题答案: 您可以使用方法保存到单个文件中。这样,您

  • 因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我

  • 经过一些处理后,我得到了一个DStream[字符串,ArrayList[字符串]],所以当我使用saveAsTextFile将其写入hdfs时,每个批处理后它都会覆盖数据,所以如何通过附加到以前的结果来写入新结果 编辑:: 如果有人可以帮助我将输出转换为avro格式,然后写入HDFS并附加

  • 在Spark流式传输中,是否可以将特定的RDD分区分配给集群中的特定节点(为了数据局部性?) 例如,我得到一个事件流[a,a,a,b,b],并有一个2节点的Spark集群。 我希望所有的a总是去节点1,所有的b总是去节点2。 谢啦!

  • 我尝试从Kafka加载数据,这是成功的,但我无法转换为火花RDD, 现在如何读取此流对象???我的意思是将其转换为Spark数据帧并执行一些计算 我尝试转换到dataframe 但是toDf不工作错误:value toDf不是org.apache.spark.rdd.RDD的成员[org.apache.spark.sql.行]