当前位置: 首页 > 知识库问答 >
问题:

Spark-Kafka-Streaming:偏移管理-无法手动提交工作(Java)

郗俊能
2023-03-14

我们正在使用JavaInputDStream

我们按照Spark Streaming-Kafka集成指南中的描述实施了偏移管理,但现在我们刚刚意识到偏移管理不适合我们,并且如果当前小批量中出现故障,Stream不会再次读取消息。即使我们跳过这一行,它也不会再次读取消息:

((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

我们将代码分解为以下内容,并期望流最终在循环中一次又一次地读取相同的消息,但事实并非如此:

stream.foreachRDD(recordRDD -> {
   final OffsetRange[] offsetRanges = ((HasOffsetRanges) recordRDD.rdd()).offsetRanges();
   if (!recordRDD.isEmpty()) {
      LOGGER.info("Processing some Data: " + recordRDD.rdd().count());
   }
});

消费者配置参数enable.auto.commit设置为false,初始化JavaInputDStream后也会显示在日志中。我们在测试中的嵌入式Kafka Broker和开发阶段的Kafka-Server都面临同样的问题。目前两者都以独立模式运行。

我们尝试的是:

  • 代理配置:增加 offsets.commit.timeout.ms
  • 使用者/流配置:将隔离级别设置为“read_committed”
  • 使用者/流配置:将自动偏移量重置设置为最早
  • 火花:将火花、流媒体、非垃圾邮件设置为虚假
  • 火花:增加火花的值
  • 流:将流相位持续时间调整为比迷你批处理更长的时间
  • 流:启用检查点
  • 流:更改的位置策略

所有这些都不起作用,看起来我们搜索了整个网络,却没有找到问题的原因。看起来流忽略了enable.auto.commit配置,只是在阅读了当前RDD的消息后提交。无论我们尝试什么,我们的流都只读取每条消息一次。

我遗漏了什么不同的方法或事实吗?

共有1个答案

赫连卓
2023-03-14

经过更多的测试,我们发现只有在实际批处理期间流停止/崩溃时,手动提交才有效。如果流停止并重新启动,它将再次消耗失败的数据。

因此,我们目前所做的是,每当我们检测到故障时,直接停止流 JavaStreamContext.stop(假)。在此之后,计划程序将再次启动流,调度程序会验证流是否在一段规律的时间内处于活动状态,如果不是,则启动它。

这不是一个优雅的解决方案,但它首先适用于我们。

 类似资料:
  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。 是否可以手动控制何时可以执行提交?

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 我正在实现spring kafka批处理侦听器,它读取来自kafka主题的消息列表,并将数据发布到REST服务。我想了解在REST服务停止的情况下的偏移管理,不应该提交批处理的偏移,应该为下一次轮询处理消息。我已经阅读了spring kafka文档,但在理解侦听器错误处理程序和批量查找当前容器错误处理程序之间的区别时存在困惑。我使用的是spring-boot-2.0.0。M7及以下版本是我的代码。

  • 我是否可以按以下方式使用Spring Cloud Steam实现手动Kafka偏移管理: 每当我的使用者处理消息时,它都会将其偏移量提交到DB中。不喜欢Kafka 每当我的使用者重新启动时,它就从数据库中读取上次处理的偏移量,查找该偏移量并开始处理下一条消息。