我正在读取来自Apache Kafka的json消息,然后使用Apache Spark在Azure blob存储中编写拼花文件。我使用方法partitionBy将这些拼花地板文件写入嵌套文件夹中。我的代码如下:
val sourceDF = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerList)
.option("subscribe", sourceTopic)
.option("startingOffsets", "latest")
.load()
sourceDF
.select(...)
.where("somecol"=="something")
.writeStream
.format("parquet")
.option("path", outputPath+"/somepath")
.option("checkpointLocation", checkpointLocation+"/somepath")
.partitionBy("date","country")
.queryName("test")
.trigger(Trigger.ProcessingTime(interval+" seconds"))
.start()
我注意到火花应用程序会产生空的镶木地板文件。这对我来说是一个瓶颈,因为我在hive导入过程中读取了这些镶木地板文件,并且抛出了一个异常,即这不是镶木地板文件(长度太小:0)
一般来说,我想禁止Spark streaming写入空文件。
数据帧。rdd。isEmpty()//检查if中的空数据帧,并且不运行。
我们需要每天将文本数据转换为拼花地板/avro,如果输入来自多个具有不同结构的源,我们希望使用基于spark sql的scala代码来实现这一点,而不考虑分隔符和列数或结构。
我在用spark-submit(2.4.0)提交的spark应用程序中发现了这个异常 用户类引发异常:org.apache.spark.sql.analysisException:为parquet找到了多个源(org.apache.spark.sql.execution.datasources.parquet.parquetFileFormat,org.apache.spark.sql.execu
我正在使用Spark结构化流媒体;我的DataFrame具有以下架构 如何使用Parquet格式执行writeStream并写入数据(包含zoneId、deviceId、TimesInclast;除日期外的所有内容)并按日期对数据进行分区?我尝试了以下代码,但partition by子句不起作用
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
使用Python 3.6在Amazon EMR集群(1个主节点,2个节点)上运行Spark 2.4.2 我正在Amazon s3中读取对象,将其压缩为拼花格式,并将其添加(附加)到现有的拼花数据存储中。当我在pyspark shell中运行代码时,我能够读取/压缩对象,并将新的拼花文件添加到现有的拼花文件中,当我对拼花数据运行查询时,它显示所有数据都在拼花文件夹中。但是,当我在EMR集群上的步骤中