我正在使用Spring Cloud Stream和Kafka Binder批量消费来自一个Kafka主题的消息。我正在尝试实现一个错误处理机制。根据我的理解,我不能在批处理模式下使用Spring Cloud Stream的< code>enableDLQ属性。
我找到了RecoveryBatchErrorHandler
和DeadLetterPublishingRecoverer
,以重试并从spring-kafka文档发送失败消息。但我无法理解如何按照功能编程标准将记录发送到自定义DLQ主题。我看到的所有例子都是使用KafkaTemplates。
有什么好的例子可以让我找到实现吗?
这就是我一直提到的Spring文档。
https://docs . spring . io/spring-Kafka/docs/2 . 5 . 12 . release/reference/html/# recoverying-batch-eh
https://spring.io/projects/spring-kafka#support OSS不再支持该版本
对于当前版本,使用配置有DeadLetterPublishingRecover
的DefaultErrorHandler
,并抛出BatchListenerExceptionFailedException
来告诉框架批处理中的哪个记录失败。
看见https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-错误处理和https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-字母和https://docs.spring.io/spring-kafka/docs/current/reference/html/#legacy-嗯
添加侦听器容器定制器 bean,将配置的错误处理程序添加到侦听器容器
。
在我开始使用Spring Cloud Stream之前,我使用的是Spring-Kafka及其对批量消费和自定义错误处理的支持。请注意这段代码的最后两行: 然而,对于Spring Cloud Stream,我找不到如何配置它。我只能找到这些配置属性: Spring、响铃、水流、kafka.bindings.inputconsumer。autoCommitOffset,启用Dlq 因此,在Sprin
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
我们在Spring Cloud Stream上有一个应用程序,它与Project React集成在一起。我们通过在消息标题中设置spring.cloud.stream.sendto.destination来动态设置目标主题并发布消息。我们正在寻找处理错误的情况下,如Kafka服务器断断续续或主题不可用而发布。我们已经实现了@ServiceActivator来处理所有错误。动态设置主题时,Servi
我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个
我正试图按照GitHub的建议设置测试 其中StreamProcessor设置为 -->line从不使用在我看来应该在主题“output”上的消息,因为@StreamProcessor有@Sendto(“output”) 我希望能够测试流处理的消息。