我需要在Spring云流kafka活页夹错误处理场景的帮助。我的应用程序有一个java 8消费者,其绑定在application.yaml中指定。
@Bean
public Consumer<Message<Transaction>> doProcess() {
return message -> {
Transaction transaction = message.getPayload();
if(true) {
throw new RuntimeException("exception!! !!:)");
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT,
Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
application.yaml:
spring.application.name: appname
spring.cloud.stream:
function.definition: doProcess
kafka:
default.consumer:
startOffset: latest
useNativeDecoding: true
bindings:
input.consumer.autoCommitOffset: false
bindings:
doProcess-in-0:
destination: kafka.input.topic.name
group: appGroup
content-type: application/*+avro
consumer:
autoCommitOffset: false.
现在,我正在处理错误,有两个问题:
>
我正在尝试手动包装消息的消费,而不是使用自动提交偏移设置为真。因此,当我将自动提交偏移设置设为假并测试错误场景时,会面临奇怪的行为,即每当抛出异常时,消息都会重试n次,即使在服务重新启动后也会发生失败消息的重试/重新传递(如果在n次重试完成之前重新启动)。一旦n次重试完成,即使在服务重新启动后也不会选择消息。那么这是否意味着,消费者在n次重试/重新传递消息后提交偏移量,这不应该是这种情况,因为自动提交偏移是错误的。
注意:我尚未配置任何 dlq。
我们需要编写自定义异常处理程序,在那里我们可以捕获异常(应用程序代码和框架中的错误)并通过AWS环境中的电子邮件向用户组发送通知。但是,我们无法找到任何可以捕获这两种类型异常的错误处理程序。比如扩展SeekTo货币错误处理程序或任何其他可以在错误事件上调用的侦听器。
编辑:
根据 Gary 提供的解决方案,我们可以使用以下 bean 来配置自定义错误处理程序:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> MQLCC() {
System.out.println(String.format("DEBUG: Bean %s has bean created.", "MQLCC"));
return new ListenerContainerCustomizerCustom ();
}
private static class ListenerContainerCustomizerCustom implements ListenerContainerCustomizer<AbstractMessageListenerContainer> {
@Override
public void configure(AbstractMessageListenerContainer container, String destinationName, String group) {
System.out.println(String.format("HELLO from container %s, destination: %s, group: %s", container, destinationName, group));
}
}
侦听器容器中的默认错误处理程序将重试10次,然后记录错误并丢弃记录;对于不同的行为,您需要配置自定义错误处理程序和恢复策略。使用ListenerContainerCustomizer
bean来配置容器。
请参阅https://docs . spring . io/spring-Kafka/docs/current/reference/html/# default-eh和https://docs . spring . io/spring-Kafka/docs/current/reference/html/# dead-letters
(3.2及更高版本)
或https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-至当前和https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#dead-字母
对于早期版本。
我设置了一个Spring云流Kafka制作人和消费者,有3个Kafka经纪人在运行。我已经设置了min.insync。将副本复制到4,以查看生产者错误处理的工作方式。消息通道。send(发送) 以上是我的生产者配置。虽然retries设置为3,但生产者仍会多次重试。虽然sync设置为true,但发送呼叫会立即发出。虽然定义了错误通道和目标,并且将errorChannelEnabled设置为true
大家好,我们使用的是Spring kafka 1.3.3,我们的应用程序是消耗-进程-发布管道。 如果在生产阶段流水线出现任何故障,我们如何处理重试并寻求返回。例如:应用程序正在消耗消息,处理它们并以异步方式发布到另一个主题中。但如果在发布中有任何错误
我们在Spring Cloud Stream上有一个应用程序,它与Project React集成在一起。我们通过在消息标题中设置spring.cloud.stream.sendto.destination来动态设置目标主题并发布消息。我们正在寻找处理错误的情况下,如Kafka服务器断断续续或主题不可用而发布。我们已经实现了@ServiceActivator来处理所有错误。动态设置主题时,Servi
我已经使用Spring云流启动了一个小型微服务。 我只有两个流绑定,如下所示: 我用Serenity开发了组件测试,我将通道注入到我想要发送测试消息的地方: 哪里: 只是定义为字符串常量: 组件测试模块导入依赖项: 我发送的信息如下: 快乐流工作正常。但是,我想在侦听器无法处理消息时测试错误流。 这是一个监听器的例子: 从try/catch引发异常时,错误由服务激活器处理: 在没有Spring-C
我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。
我用Spring云溪和Kafka溪。假设我有一个处理器,它的功能是将KStream字符串转换为KStream CityProgrammes。它调用一个API来根据名称查找城市,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是,任何错误发生在转换期间,整个应用程序停止。我想把一个特定的消息发送给DLQ,然后继续前进。我已经读了几天了,每个人都建议在被调用的服务中处理错误,但在我看来这是一个