当前位置: 首页 > 知识库问答 >
问题:

结构化流不会引用/_spark_metadata/9将DF写入文件接收器。compact不存在

董桐
2023-03-14

我正在EMR 5.11.1,Spark 2.2.1中构建一个Kafka摄取模块。我的意图是使用结构化流媒体从Kafka主题中消费,进行一些处理,并以拼花格式存储到EMRFS/S3。

控制台接收器按预期工作,文件接收器不工作。

火花壳中:

val event = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()

val eventdf = event.select($"value" cast "string" as "json")
.select(from_json($"json", readSchema) as "data")
.select("data.*")

val outputdf = <some processing on eventdf>

这是可行的:

val console_query = outputdf.writeStream.format("console")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start 

这不会:

val filesink_query = outputdf.writeStream
.partitionBy(<some column>)
.format("parquet")
.option("path", <some path in EMRFS>)
.option("checkpointLocation", "/tmp/ingestcheckpoint")
.trigger(Trigger.ProcessingTime(10.seconds))
.outputMode(OutputMode.Append)
.start //fails

我尝试过但没有成功的事情:

    < Li > < code > sc . Hadoop configuration . set(" parquet . enable . summary-metadata "," false") < li >将格式从拼花改为CSV < li >将输出模式更改为完成(仅支持追加) < li >不同的触发间隔 < li > <代码>。< code>readStream上的选项(" failOnDataLoss ",false)

一些深入研究源代码的人把我带到了这里:https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala它表示缺少.compact文件将触发默认设置。

因此,请尝试:spark.conf.set(“spark.sql.流式处理文件Sink.log.cleanupDelay”,60000),以确保在新批处理创建组合元数据文件之前不会删除旧批处理的元数据

使此错误令人讨厌的是,它并不总是可重现的。如果不更改代码中的单个字符,写入 parquet 有时会起作用,或者不会。我尝试过清理检查点位置,火花/ HDFS日志等,以防火花内部的“状态”导致此问题。

下面是错误堆栈跟踪:

query: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@56122c1

18/04/09 20:20:04 ERROR FileFormatWriter: Aborting job null.
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:207)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:306)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
18/04/09 20:20:04 ERROR StreamExecution: Query [id = 5251fe93-2b6b-4dff-bec3-7801dc7e6417, runId = 083547c1-69b7-40e7-8bf9-3c3af11d4c31] terminated with error
org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:213)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:166)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply$mcV$sp(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$1.apply(StreamExecution.scala:666)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:665)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:306)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Caused by: java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4$$anonfun$apply$1.apply(CompactibleFileStreamLog.scala:174)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:173)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog$$anonfun$4.apply(CompactibleFileStreamLog.scala:172)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.compact(CompactibleFileStreamLog.scala:172)
        at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.add(CompactibleFileStreamLog.scala:156)
        at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:64)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:207)
        ... 20 more

共有3个答案

谢鸿飞
2023-03-14

当文件接收器路径在结构化流作业的重新启动之间发生变化或输出路径的文件系统不可靠并且数据可能在重新启动之间丢失时,可能会显示此错误消息。

如果您更改了文件接收器输出路径,您可以从

/old/file/sink/path/_spark_metadata/9.compact

/new/file/sink/path/_spark_metadata/9.compact

我在另一个答复中提供了更多背景信息

赫连瀚
2023-03-14

我通过清除检查点路径解决了这个问题:

>

  • 删除您的检查点路径:

    sudo -u hdfs hdfs dfs -rmr ${your_checkpoint_path}

    重新提交您的星火作业。

  • 归德厚
    2023-03-14

    事实证明,S3不支持Spark检查点所需的写后读语义。

    本文建议使用AWSEFS进行检查点设置。

    S3仍然是从中获取数据或将数据写入的好地方。

     类似资料:
    • 问题内容: 将结构转储到提供的csv文件中的惯用golang方法是什么?我在一个func里面,我的结构作为接口{}传递: 为什么要使用界面{}?-从JSON读取数据,可能会返回一些不同的结构,因此尝试编写足够通用的函数。 我的类型的一个例子: 问题答案: 如果您使用具体类型,将会容易得多。您可能想要使用该软件包,这是一个相关示例;https://golang.org/pkg/encoding/cs

    • 如何通过tcp套接字接收struct并输出到文件?我有一个结构,这是一个包,我试图通过套接字发送它,在客户端接收它,并写入文件。我使用一个空指针作为缓冲区来存储文件数据 到目前为止,我有 我能够访问客户端的 id、int,但是当我将数据写出到文件时。文件似乎已损坏。 更新: 服务器有一个打开的文件,它将从该文件中读取数据,并将其制成一个数据包,然后将该数据包发送给客户端。 客户端还将打开一个文件进

    • 问题内容: 我正在尝试开发一个可检测用户何时拍照的应用程序。我设置了广播接收器类,并通过以下方式将其注册到清单文件中: 无论我做什么,该程序都不会收到广播。这是我的接收器类: 如果删除清单和活动中的mimeType行,则使用以下命令发送自己的广播 然后我成功接收到广播,可以看到日志和吐司窗口​​。我是否采用正确的方法?有什么需要补充的吗? 问题答案: 我解决了这个问题,但是使用了另一种方法。我没有

    • 我正在使用带更新模式的结构化流媒体读取Kafka主题中的数据流。,然后做一些改变。 然后我创建了一个jdbc接收器,用追加模式在mysql接收器中推送数据。问题是我如何告诉我的接收器让它知道这是我的主键,并基于它进行更新,这样我的表就不会有任何重复的行。

    • 我正在从Kafka读取数据,并试图以ORC格式将其写入HDFS文件系统。我使用了他们官网链接中的以下链接参考。但是我可以看到Flink为所有数据编写完全相同的内容,并使得这么多文件和所有文件都可以103KB https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/streamfile_sink.html

    • 我们最近开始了使用Scala、Spark和Cassandra的大数据项目,我对所有这些技术都是新手。我试图做简单的任务写到和读从卡桑德拉表。如果将属性名和列名都保留为小写或snake大小写(unserscores)就可以实现这一点,但我希望在scala代码中使用camel大小写。在Scala中使用camel case格式,在Cassandra中使用snake case格式,有没有更好的方法来实现这