当前位置: 首页 > 知识库问答 >
问题:

使用DeadLetterPublishingRecoverer和手动确认模式的Spring Kafka侦听器

奚瑾瑜
2023-03-14

我很难理解如何解决这个问题,所以我在这里问这个问题,希望其他人已经面临同样的问题。我们正在以手动确认模式运行@KafkaListener,死信恢复程序的重试限制为3。由于业务逻辑,在特定情况下(外部依赖),我们不确认消息并暂停消费5分钟,因此需要手动确认模式。

此外,我们确实需要由于某种原因无法处理的消息的死信队列。

现在,在手动确认模式下的问题是,当侦听器/消费者达到重试限制并将其移动到dl队列时,他不会确认消息。

如果消费者服务将重新启动,他会尝试一次又一次地使用消息,并将它们移动到dl队列。

有没有办法解决这个问题?

来自汉堡的感谢和问候!

共有2个答案

毛弘博
2023-03-14

在提供给ContainerListenerFactory的SeekToCurInterrorHandler实例上,可以将commitRecovered设置为true。

请在这里留档

public void setCommitRecovered(布尔commitRecovered)

设置为true以提交已恢复记录的偏移量。容器必须是

姜羽
2023-03-14

如果可能的话,我会尽量避免使用手动确认;也许通过增加max.poll。间隔而是ms

如果您使用AckMode。MANUAL_IMMEDIATE,直接在错误处理程序中的消费者上执行提交是安全的。

子类的SeekToMONtErrorHandler和覆盖句柄(),如果super.handle()没有抛出异常,这意味着重试被超过,你可以提交消费者上的偏移量。

 类似资料:
  • 我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执

  • 我需要使用consume process Product模式来处理Kafka消息,并已使用Kafka事务管理器配置了Spring Kafka侦听器容器,还设置了事务id前缀以启用Kafka事务。我正在使用批处理的ack模式,并试图了解在这种模式下,在事务中何时提交偏移量。文档似乎表明,一旦使用了轮询中的所有记录,ack模式批提交偏移量——在事务上下文中也是这样吗,即每个轮询1个事务? 或者,在使用

  • 我使用JMS从IBM MQ message Broker发送接收消息。我目前正在处理侦听器服务,抛出未处理的异常和消息发送回队列而没有确认。我希望服务重试一个可配置的次数,并抛出表示监听器服务不可用的完全异常消息。 我的监听器和容器工厂如下所示。

  • 我正在用Java实现Akka-Alpakka,用于从ActiveMQ队列消费和生成队列。我可以成功地从队列中消费,但还不能实现应用程序级消息确认。 我的目标是使用队列中的消息,并将它们发送给另一个参与者进行处理。当该角色完成处理后,我希望它能够控制ActiveMQ中的消息确认。这大概是通过向另一个可以进行确认的参与者发送消息、调用消息本身的确认函数或其他方式来完成的。 在我的测试中,两条消息被放入

  • 问题内容: 每当用户在其android设备上插入错误的锁定模式时,我都试图找到一种接收消息的方法。老实说,我对如何实现这一目标一无所知,但我想我应该在注册为侦听器的后台提供某种服务。但是我应该在哪个广播员上注册我的收听者? 很抱歉,我没有合适的术语,可能我的文字没有多大意义……但是基本上,我需要构建一个小类,每当插入错误的锁定模式时,该类就执行一个动作。 我并不需要完整的代码,我只需要一个小例子t

  • 问题内容: 有人知道在python中跟踪字典对象更改的任何简便方法吗?我的工作水平很高,所以我有一些方法可以处理更改字典的操作,如果字典发生更改,我想调用一个函数来基本上执行Observer / Notify。 我要避免的是所有跟踪(设置布尔值)代码。希望有一种更轻松的方式来跟踪更改。这是一个简单的情况,但是可能存在更复杂的逻辑,这将导致我不得不设置更改的标志。 问题答案: 您可以从该类派生并在任