当前位置: 首页 > 知识库问答 >
问题:

spark流式处理失败的批处理

夏才
2023-03-14

我在spark streaming应用程序中看到一些失败的批处理,原因是与内存相关的问题,如

无法计算拆分,找不到块输入-0-1464774108087

共有1个答案

季森
2023-03-14

在spark中的数据摄取率高于已分配或可保留的内存的情况下,可能会发生这种情况。您可以尝试将StorageLevel更改为memory_and_disk_ser,这样当内存不足时,Spark会将数据溢出到磁盘。这将防止您的错误。

此外,我认为这个错误并不意味着在处理过程中丢失了任何数据,而是块管理器添加的输入块在处理开始之前超时了。

在Spark用户列表上检查类似的问题。

编辑:

数据并没有丢失,只是没有出现在任务所期望的位置。根据Spark文档:

您可以使用其上的persist()或cache()方法标记RDD以持久化。第一次在操作中计算它时,它将保留在节点的内存中。Spark的缓存是容错的--如果RDD的任何分区丢失,它将自动使用最初创建它的转换重新计算。

 类似资料:
  • 11.1 日志项处理和失败 一个常见的用例是需要在一个步骤中特殊处理错误,chunk-oriented步骤(从创建工厂bean的这个步骤)允许用户实现一个简单的ItemReadListener用例,用来监听读入错误,和一个ItemWriteListener,用来监听写出错误.下面的代码片段说明一个监听器监听失败日志的读写: >public class ItemFailureLoggerListen

  • 我对spark streaming有两个问题: < li >我有一个spark流应用程序正在运行,并以< code>20秒的批处理间隔收集数据,在< code>4000个批处理中,有< code>18个批处理因异常而失败: 无法计算拆分,块输入-0-1464774108087 未找到 我假设此时数据大小大于spark可用内存,并且应用程序< code>StorageLevel为< code>MEM

  • 我在中看到了几个答案(例如这里),因此建议批次中的记录将成为单个RDD。我对此表示怀疑,因为假设batchInterval为1分钟,那么单个RDD将包含最后一分钟的所有数据? 注意:我不是直接将批次与RDD进行比较,而是将Spark内部处理的批次进行比较。

  • 在 Spark 流式处理中,如何检测空批次? 让我们以有状态流式处理字数为例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java。是否可以仅在将新单词添加到流中时才打印字数RDD

  • 类项目: hbm文件: 方法如下:

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。