当前位置: 首页 > 知识库问答 >
问题:

Spark结构化流应用程序将空拼花文件生成到Azure blob

凌善
2023-03-14

我正在读取来自Apache Kafka的json消息,然后使用Apache Spark在Azure blob存储中编写拼花文件。我使用方法partitionBy将这些拼花地板文件写入嵌套文件夹中。我的代码如下:

val sourceDF = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerList)
      .option("subscribe", sourceTopic)
      .option("startingOffsets", "latest")
      .load()
      



sourceDF
      .select(...)
      .where("somecol"=="something")
      .writeStream
      .format("parquet")
      .option("path", outputPath+"/somepath")
      .option("checkpointLocation", checkpointLocation+"/somepath")
      .partitionBy("date","country")
      .queryName("test")
      .trigger(Trigger.ProcessingTime(interval+" seconds"))
      .start()

我注意到火花应用程序会产生空的镶木地板文件。这对我来说是一个瓶颈,因为我在hive导入过程中读取了这些镶木地板文件,并且抛出了一个异常,即这不是镶木地板文件(长度太小:0)

一般来说,我想禁止Spark streaming写入空文件。

共有1个答案

诸葛雨泽
2023-03-14

数据帧。rdd。isEmpty()//检查if中的空数据帧,并且不运行。

 类似资料:
  • 我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。

  • 我在用spark-submit(2.4.0)提交的spark应用程序中发现了这个异常 用户类引发异常:org.apache.spark.sql.analysisException:为parquet找到了多个源(org.apache.spark.sql.execution.datasources.parquet.parquetFileFormat,org.apache.spark.sql.execu

  • 我正在使用Spark结构化流媒体;我的DataFrame具有以下架构 如何使用Parquet格式执行writeStream并写入数据(包含zoneId、deviceId、TimesInclast;除日期外的所有内容)并按日期对数据进行分区?我尝试了以下代码,但partition by子句不起作用

  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2 我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中