当前位置: 首页 > 知识库问答 >
问题:

通过拒绝确认重读Kafka主题的信息

薛利
2023-03-14

我正在使用spring-integration-kafka实现带有自定义确认机制的Kafka消费者。

但我面临一个问题:即使确认没有发送给Kafka,消费者仍然知道下一条消息的偏移量,并且继续阅读新消息,尽管偏移量主题中的偏移量值保持不变。

如何使用户在某些故障的情况下重读消息?

共有1个答案

郎雅昶
2023-03-14

当前不支持重新获取失败消息

您可以做的一件事是在消息驱动适配器的下游添加重试(例如,使用请求处理程序重试建议)。

通过不加顶,消息将在重新启动后传递,而不是在当前实例化期间传递。

编辑

现在有一个SeekToCurrEnterrorHandler。

 类似资料:
  • 在我们的docker-swarm中运行kafka connect,使用以下撰写文件: kafka connect节点成功启动,我可以设置任务并查看这些任务的状态······ 我是否在撰写文件或任务配置中缺少某些配置?

  • 我们想通过spring-kafka列出所有Kafka主题,以获得类似于kafka命令的结果: 在下面的服务中运行 getTopics() 方法时,我们会得到 配置: 服务: Kafka已经启动并运行,我们可以成功地从应用程序向主题发送消息。

  • 所以我设置了一个汇流Kafka JDBC连接器。首先,我启动一个模式注册表,如 这是schema-registery.properties文件 接下来,我启动一个像这样的独立连接器 connect-avro-standalone.properties是 jdbc-source.properties是 我使用的查询只是为了测试的目的,我要使用的真正查询将实现增量模式,并且将不包含where子句。 我

  • 在阅读Kafka主题时,我得到了奇怪的ArrayIndexOutOfBoundsException。花了很多时间却搞不清问题所在。有人能在这方面提供帮助/建议吗。这是我的日志。

  • 我试图了解如何跟踪Kafka的信息摄取。 我们现在遵循的工作流程是清除主题中的所有消息,然后我们用代码更改重新摄取。我需要知道那些代码更改有多成功。在当前状态下,我正在使用Kafka工具,手动刷新消息总数,并将结果保存在csv中,我知道这是不可持续的长期。 你对自动获取Kafka主题中的消息计数有什么建议?理想情况下,我想击中的主题一分钟一分钟的频率,并得到计数,以及窗口的时间,如1天等。

  • 我有一个SNS主题&订阅(实际上不止1个)设置来使用SQS DLQ。然而,每一个都告诉我,我有一个策略错误。 我的SNS订阅设置了DLQ: 我的队列存在: 我还尝试在队列上使用一个真正通用的访问策略: 我遵循的是:https://docs.aws.amazon.com/sns/latest/dg/sns-configure-dead-letter-queue.html(步骤5解释了设置策略) 其他