当前位置: 首页 > 知识库问答 >
问题:

如何在Spark Kafka直接流中手动提交偏移?

太叔富
2023-03-14

共有1个答案

段干宾白
2023-03-14

下面的文章可能是理解这种方法的一个很好的开始。

火花-Kafka-实现-零数据丢失

更进一步,

 类似资料:
  • 我有一个Kafka接收器任务,通过方法收听Kafka主题 但我不想自动提交偏移量,因为一旦从Kafka取出记录,我就有一些处理逻辑 从Kafka获取记录后,如果处理成功,则只有我想提交偏移量,否则它应该再次从同一偏移量读取。 我可以在Kafka consumer中看到方法,但在中找不到替代方法。

  • 我使用的是camel kafka组件,我不清楚在提交补偿时引擎盖下发生了什么。如下所示,我正在聚合记录,我认为对于我的用例来说,只有在记录保存到SFTP后提交偏移量才有意义。 是否可以手动控制何时可以执行提交?

  • 我第一次使用Spring Kafka,我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交。请让我知道我的消费者配置或侦听器代码中是否缺少任何内容。或者有其他方法可以根据条件处理确认偏移。在这里,我正在寻找解决方案,例如如果偏移没有手动提交/确认,它应该由消费者选择相同的消息/偏移量。 配置 听众

  • 嗨,我们一直在使用旧的spring版本和kafka 1.1,并有以下依赖项 我在应用程序中有以下配置。yaml文件 我在stackoverflow中找到了链接。答案有 首先,我不确定您对SpringApplication.ext(applicationContext,())的期望是什么- 这是否意味着Kafka不知道消费者已关闭,在这种情况下没有来自消费者的通信。我认为会发生重新平衡,此外,由于k

  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack