当前位置: 首页 > 知识库问答 >
问题:

非法状态异常:压缩批次9时,_spark_metadata/0不存在

楚翰
2023-03-14

我们有使用Spark结构化流实现的流应用程序,它试图从Kafka主题中读取数据,并将其写入HDFS位置。

有时应用程序会因异常而失败:

_spark_metadata/0 doesn't exist while compacting batch 9
java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)

我们无法解决这个问题。

我找到的唯一解决方案是删除检查点位置文件,这将使作业在我们再次运行应用程序时从头开始读取主题/数据。然而,这对于生产应用程序来说不是一个可行的解决方案。

是否有人有针对此错误的解决方案,而无需删除检查点,以便我可以从上次运行失败的位置继续?

应用程序示例代码:

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", <server list>)
  .option("subscribe", <topic>)
  .load()

[...] // do some processing

dfProcessed.writeStream
  .format("csv")
  .option("format", "append")
  .option("path",hdfsPath)
  .option("checkpointlocation","")
  .outputmode(append)
  .start

共有3个答案

纪佐
2023-03-14

本文介绍了该机制,并提供了一种从Spark结构化流中已删除的_spark_metadata文件夹中恢复的好方法:

https://dev.to/kevinwallimann/how-to-recover-from-a-deleted-sparkmetadata-folder-546j

“创建虚拟日志文件:

如果元数据日志文件不可恢复,我们可以为丢失的微批处理创建虚拟日志文件。在我们的示例中,可以像这样完成:

for i in {0..1}; do echo v1 > "/tmp/destination/_spark_metadata/$i"; done

这将创建文件

/tmp/destination/_spark_metadata/0
/tmp/destination/_spark_metadata/1

现在,可以重新启动查询,并应在没有错误的情况下完成。“

因为我以前的输出文件夹无法再恢复。我尝试了这个虚拟解决方案,它可以摆脱非法状态例外:_spark_metadata/...不存在例外。

闾丘照
2023-03-14

由< code>checkpointLocation引起的错误,因为< code>checkpointLocation存储旧的或已删除的数据信息。您只需删除包含< code>checkpointLocation的文件夹。

了解更多信息:https://kb . databricks . com/streaming/file-sink-streaming . html

例子:

df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", "D:/path/dir/checkpointLocation")
      .option("path", "D:/path/dir/output")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
      .awaitTermination()

您需要删除目录<code>checkpointLocation</code>。

陶博涉
2023-03-14

错误消息

_spark_metadata/n.compact doesn't exist when compacting batch n+10

可以在您

  • 将一些数据处理到启用了检查点的FileSink中,然后
  • 停止你的流媒体工作,然后
  • 更改FileSink的输出目录,同时保持相同的检查点位置,然后
  • 重启流作业

只需删除checkpointLocation中的文件并重新启动应用程序。

因为您不想删除检查点文件,所以您可以简单地将丢失的spark元数据文件从旧的文件接收器输出路径复制到新的输出路径。见下文了解什么是“缺失的spark元数据文件”。

为了理解为什么会抛出这个<code>IllegalStateException</code>,我们需要了解所提供的文件输出路径中的幕后情况。让outPathBefore作为此路径的名称。当您的流作业正在运行并处理数据时,该作业将创建一个文件夹outPathBefore/_spark_metadata。在该文件夹中,您将找到一个以微批标识符命名的文件,其中包含数据已写入的文件(分区文件)列表,例如:

/home/mike/outPathBefore/_spark_metadata$ ls
0 1 2 3 4 5 6 7

在这种情况下,我们有8个微批次的详细信息。其中一个文件的内容如下所示

/home/mike/outPathBefore/_spark_metadata$ cat 0
v1
{"path":"file:///tmp/file/before/part-00000-99bdc705-70a2-410f-92ff-7ca9c369c58b-c000.csv","size":2287,"isDir":false,"modificationTime":1616075186000,"blockReplication":1,"blockSize":33554432,"action":"add"}

默认情况下,在每十个微批处理中,这些文件被压缩,这意味着文件0、1、2,...,9将存储在名为< code>9.compact的压缩文件中。

过程连续进行随后的十个批次,即在微批次19中,作业聚合9.compact、10、11、12、…、19的最后10个文件。

现在,假设您让流式作业一直运行到微批处理15,这意味着该作业创建了以下文件:

/home/mike/outPathBefore/_spark_metadata/0
/home/mike/outPathBefore/_spark_metadata/1
...
/home/mike/outPathBefore/_spark_metadata/8
/home/mike/outPathBefore/_spark_metadata/9.compact
/home/mike/outPathBefore/_spark_metadata/10
...
/home/mike/outPathBefore/_spark_metadata/15

在第15个微批处理之后,您停止了流作业,并将文件接收器的输出路径更改为,比如说< code > outthafter 。由于您保持相同的检查点位置,流式作业将通过微批处理16继续。但是,它现在在新的输出路径中创建元数据文件:

/home/mike/outPathAfter/_spark_metadata/16
/home/mike/outPathAfter/_spark_metadata/17
...

现在,这是抛出异常的地方:当到达微批处理19时,作业试图压缩火花元数据文件夹中的第十个最新文件。但是,它只能找到文件16、17、18,但找不到9.compact、10等。因此错误消息如下:

java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)

结构化流式编程指南解释了流式查询中更改后的恢复语义:

"不允许更改文件接收器的输出目录:sdf.writeStream.format("parket"). ption("path","/xxPath")到sdf.writeStream.format("parket"). ption("path","/antherPath")"

Databricks还在文章《使用文件接收器传输:更改检查点或输出目录时的恢复问题》中写了一些细节

 类似资料:
  • 我正在尝试使用下面的快速加载API 连接…等是完美的。 我确切地知道它在哪里失败 例外情况是 < code >线程“main”Java . lang . illegalstateexception中出现异常:示例失败。 这是我试图上传的表格。它是格式,当我通过记事本打开它时,它看起来像这样 为什么我会得到这个异常?我该如何改进?据我理解问题是< code > pstmtfld . setascii

  • 问题内容: 如何将轮询线程传递给另一个线程进行处理。程序执行在具有主方法和线程池的控制器类中: 主类控制器 具有轮询类的线程的方法 具有proc类的线程的方法 轮询类和控制器类 我的任务和问题是: 1.控制器应同时处理轮询器和处理器线程,并且应仅调用轮询器和处理器线程 2.现在我的问题是如何使轮询线程等待3秒并并行通知处理器。 我得到如下错误: 这里如何实现异步处理? 问题答案: 你需要阅读的东西

  • 问题内容: 这是我的用法- 另外,我在http GET周围放置了一个finally块- 这是我的堆栈跟踪- 我正在使用Quartz计划监视Http端点的工作。这是我的连接池配置 Maven依赖..工件版本 编辑 -好吧,通过不关闭finally块中的CloseableHttpClient,问题解决了。有人能说出为什么这样吗? 如果关闭客户端,为什么连接池会关闭? 是上面的closeablehttp

  • 这是我如何使用它 - 此外,我已经在超文本传输协议GET周围放置了一个最终块- 这是我的堆栈跟踪- 我正在使用Quartz来安排监控Httpendpoint的工作…这是我的连接池配置 马文依赖..神器版本 编辑-嗯,这个问题通过在最后一个块中不关闭CloseableHttp客户端而得到解决…有人能告诉我为什么它会这样吗?如果我关闭客户端,为什么连接池会关闭? 上面的closeablehttpcli

  • 我的应用程序处于生产状态,它支持从API 8到23。我最近更新了应用程序,使其具有使用导航抽屉的材料设计。此版本支持从 14 到 23。 该应用程序在API 21[Lollipop]及以上版本上运行良好,但有4.4“java illegalstateexception android.support.v7.app.AppCompatDelegateImplV7.createSubDecor”的崩溃

  • 我在MediaCodec上得到了非法状态例外。configure()行,我正在尝试使用MediaCodec录制音频。这只发生在一些手机上,在标签上一切正常。这个特别的碰撞示例来自三星Galaxy S4。异常跟踪: 音频格式声明: 音频编码器初始化: 有人知道那可能是什么吗?奇怪的是,它只发生在某些设备上。欢迎提出任何建议!